Merge pull request #65335 from shyamjvs/add-scheduler-profiling-to-testing

Automatic merge from submit-queue (batch tested with PRs 65339, 65343, 65324, 65335, 65367). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Introduce scheduler CPU/Memory profile-gathering in density test

This should help us get more reliable/realistic data for scheduler (from our real-cluster scalability tests).

/cc @wojtek-t 
fyi - @davidopp @bsalamat @misterikkit 

```release-note
NONE
```
This commit is contained in:
Kubernetes Submit Queue 2018-06-22 10:31:20 -07:00 committed by GitHub
commit 5880db4a65
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 78 additions and 50 deletions

View File

@ -61,25 +61,54 @@ func checkProfileGatheringPrerequisites() error {
return nil
}
func gatherProfileOfKind(profileBaseName, kind string) error {
func getPortForComponent(componentName string) (int, error) {
switch componentName {
case "kube-apiserver":
return 8080, nil
case "kube-scheduler":
return 10251, nil
case "kube-controller-manager":
return 10252, nil
}
return -1, fmt.Errorf("Port for component %v unknown", componentName)
}
// Gathers profiles from a master component through SSH. E.g usages:
// - gatherProfile("kube-apiserver", "someTest", "heap")
// - gatherProfile("kube-scheduler", "someTest", "profile")
// - gatherProfile("kube-controller-manager", "someTest", "profile?seconds=20")
//
// We don't export this method but wrappers around it (see below).
func gatherProfile(componentName, profileBaseName, profileKind string) error {
if err := checkProfileGatheringPrerequisites(); err != nil {
return fmt.Errorf("Profile gathering pre-requisite failed: %v", err)
}
profilePort, err := getPortForComponent(componentName)
if err != nil {
return fmt.Errorf("Profile gathering failed finding component port: %v", err)
}
if profileBaseName == "" {
profileBaseName = time.Now().Format(time.RFC3339)
}
// Get the profile data over SSH.
getCommand := fmt.Sprintf("curl -s localhost:8080/debug/pprof/%s", kind)
getCommand := fmt.Sprintf("curl -s localhost:%v/debug/pprof/%s", profilePort, profileKind)
sshResult, err := SSH(getCommand, GetMasterHost()+":22", TestContext.Provider)
if err != nil {
return fmt.Errorf("Failed to execute curl command on master through SSH: %v", err)
}
var profilePrefix string
profilePrefix := componentName
switch {
case kind == "heap":
profilePrefix = "ApiserverMemoryProfile_"
case strings.HasPrefix(kind, "profile"):
profilePrefix = "ApiserverCPUProfile_"
case profileKind == "heap":
profilePrefix += "_MemoryProfile_"
case strings.HasPrefix(profileKind, "profile"):
profilePrefix += "_CPUProfile_"
default:
return fmt.Errorf("Unknown profile kind provided: %s", kind)
return fmt.Errorf("Unknown profile kind provided: %s", profileKind)
}
// Write the data to a file.
// Write the profile data to a file.
rawprofilePath := path.Join(getProfilesDirectoryPath(), profilePrefix+profileBaseName+".pprof")
rawprofile, err := os.Create(rawprofilePath)
if err != nil {
@ -97,12 +126,12 @@ func gatherProfileOfKind(profileBaseName, kind string) error {
var cmd *exec.Cmd
switch {
// TODO: Support other profile kinds if needed (e.g inuse_space, alloc_objects, mutex, etc)
case kind == "heap":
case profileKind == "heap":
cmd = exec.Command("go", "tool", "pprof", "-pdf", "-symbolize=none", "--alloc_space", rawprofile.Name())
case strings.HasPrefix(kind, "profile"):
case strings.HasPrefix(profileKind, "profile"):
cmd = exec.Command("go", "tool", "pprof", "-pdf", "-symbolize=none", rawprofile.Name())
default:
return fmt.Errorf("Unknown profile kind provided: %s", kind)
return fmt.Errorf("Unknown profile kind provided: %s", profileKind)
}
outfilePath := path.Join(getProfilesDirectoryPath(), profilePrefix+profileBaseName+".pdf")
outfile, err := os.Create(outfilePath)
@ -124,67 +153,53 @@ func gatherProfileOfKind(profileBaseName, kind string) error {
// finish before the parent goroutine itself finishes, we accept a sync.WaitGroup
// argument in these functions. Typically you would use the following pattern:
//
// func TestFooBar() {
// func TestFoo() {
// var wg sync.WaitGroup
// wg.Add(3)
// go framework.GatherApiserverCPUProfile(&wg, "doing_foo")
// go framework.GatherApiserverMemoryProfile(&wg, "doing_foo")
// go framework.GatherCPUProfile("kube-apiserver", "before_foo", &wg)
// go framework.GatherMemoryProfile("kube-apiserver", "before_foo", &wg)
// <<<< some code doing foo >>>>>>
// go framework.GatherApiserverCPUProfile(&wg, "doing_bar")
// <<<< some code doing bar >>>>>>
// go framework.GatherCPUProfile("kube-scheduler", "after_foo", &wg)
// wg.Wait()
// }
//
// If you do not wish to exercise the waiting logic, pass a nil value for the
// waitgroup argument instead. However, then you would be responsible for ensuring
// that the function finishes.
// that the function finishes. There's also a polling-based gatherer utility for
// CPU profiles available below.
func GatherApiserverCPUProfile(wg *sync.WaitGroup, profileBaseName string) {
GatherApiserverCPUProfileForNSeconds(wg, profileBaseName, DefaultCPUProfileSeconds)
func GatherCPUProfile(componentName string, profileBaseName string, wg *sync.WaitGroup) {
GatherCPUProfileForSeconds(componentName, profileBaseName, DefaultCPUProfileSeconds, wg)
}
func GatherApiserverCPUProfileForNSeconds(wg *sync.WaitGroup, profileBaseName string, n int) {
func GatherCPUProfileForSeconds(componentName string, profileBaseName string, seconds int, wg *sync.WaitGroup) {
if wg != nil {
defer wg.Done()
}
if err := checkProfileGatheringPrerequisites(); err != nil {
Logf("Profile gathering pre-requisite failed: %v", err)
return
}
if profileBaseName == "" {
profileBaseName = time.Now().Format(time.RFC3339)
}
if err := gatherProfileOfKind(profileBaseName, fmt.Sprintf("profile?seconds=%v", n)); err != nil {
Logf("Failed to gather apiserver CPU profile: %v", err)
if err := gatherProfile(componentName, profileBaseName, fmt.Sprintf("profile?seconds=%v", seconds)); err != nil {
Logf("Failed to gather %v CPU profile: %v", componentName, err)
}
}
func GatherApiserverMemoryProfile(wg *sync.WaitGroup, profileBaseName string) {
func GatherMemoryProfile(componentName string, profileBaseName string, wg *sync.WaitGroup) {
if wg != nil {
defer wg.Done()
}
if err := checkProfileGatheringPrerequisites(); err != nil {
Logf("Profile gathering pre-requisite failed: %v", err)
return
}
if profileBaseName == "" {
profileBaseName = time.Now().Format(time.RFC3339)
}
if err := gatherProfileOfKind(profileBaseName, "heap"); err != nil {
Logf("Failed to gather apiserver memory profile: %v", err)
if err := gatherProfile(componentName, profileBaseName, "heap"); err != nil {
Logf("Failed to gather %v memory profile: %v", componentName, err)
}
}
// StartApiserverCPUProfileGatherer is a polling-based gatherer of the apiserver's
// CPU profile. It takes the delay b/w consecutive gatherings as an argument and
// StartCPUProfileGatherer performs polling-based gathering of the component's CPU
// profile. It takes the interval b/w consecutive gatherings as an argument and
// starts the gathering goroutine. To stop the gatherer, close the returned channel.
func StartApiserverCPUProfileGatherer(delay time.Duration) chan struct{} {
func StartCPUProfileGatherer(componentName string, profileBaseName string, interval time.Duration) chan struct{} {
stopCh := make(chan struct{})
go func() {
for {
select {
case <-time.After(delay):
GatherApiserverCPUProfile(nil, "")
case <-time.After(interval):
GatherCPUProfile(componentName, profileBaseName+"_"+time.Now().Format(time.RFC3339), nil)
case <-stopCh:
return
}

View File

@ -64,6 +64,9 @@ var MaxContainerFailures = 0
// Maximum no. of missing measurements related to pod-startup that the test tolerates.
var MaxMissingPodStartupMeasurements = 0
// Number of nodes in the cluster (computed inside BeforeEach).
var nodeCount = 0
type DensityTestConfig struct {
Configs []testutils.RunObjectConfig
ClientSets []clientset.Interface
@ -285,6 +288,11 @@ func runDensityTest(dtc DensityTestConfig, testPhaseDurations *timer.TestPhaseTi
replicationCtrlStartupPhase := testPhaseDurations.StartPhase(300, "saturation pods creation")
defer replicationCtrlStartupPhase.End()
// Start scheduler CPU profile-gatherer before we begin cluster saturation.
profileGatheringDelay := time.Duration(1+nodeCount/100) * time.Minute
schedulerProfilingStopCh := framework.StartCPUProfileGatherer("kube-scheduler", "density", profileGatheringDelay)
// Start all replication controllers.
startTime := time.Now()
wg := sync.WaitGroup{}
@ -304,10 +312,16 @@ func runDensityTest(dtc DensityTestConfig, testPhaseDurations *timer.TestPhaseTi
wg.Wait()
startupTime := time.Since(startTime)
close(logStopCh)
close(schedulerProfilingStopCh)
framework.Logf("E2E startup time for %d pods: %v", dtc.PodCount, startupTime)
framework.Logf("Throughput (pods/s) during cluster saturation phase: %v", float32(dtc.PodCount)/float32(startupTime/time.Second))
replicationCtrlStartupPhase.End()
// Grabbing scheduler memory profile after cluster saturation finished.
wg.Add(1)
framework.GatherMemoryProfile("kube-scheduler", "density", &wg)
wg.Wait()
printPodAllocationPhase := testPhaseDurations.StartPhase(400, "printing pod allocation")
defer printPodAllocationPhase.End()
// Print some data about Pod to Node allocation
@ -366,7 +380,6 @@ func cleanupDensityTest(dtc DensityTestConfig, testPhaseDurations *timer.TestPha
// limits on Docker's concurrent container startup.
var _ = SIGDescribe("Density", func() {
var c clientset.Interface
var nodeCount int
var additionalPodsPrefix string
var ns string
var uuid string
@ -388,7 +401,7 @@ var _ = SIGDescribe("Density", func() {
close(profileGathererStopCh)
wg := sync.WaitGroup{}
wg.Add(1)
framework.GatherApiserverMemoryProfile(&wg, "density")
framework.GatherMemoryProfile("kube-apiserver", "density", &wg)
wg.Wait()
saturationThreshold := time.Duration((totalPods / MinPodsPerSecondThroughput)) * time.Second
@ -487,7 +500,7 @@ var _ = SIGDescribe("Density", func() {
// Start apiserver CPU profile gatherer with frequency based on cluster size.
profileGatheringDelay := time.Duration(5+nodeCount/100) * time.Minute
profileGathererStopCh = framework.StartApiserverCPUProfileGatherer(profileGatheringDelay)
profileGathererStopCh = framework.StartCPUProfileGatherer("kube-apiserver", "density", profileGatheringDelay)
})
type Density struct {

View File

@ -106,7 +106,7 @@ var _ = SIGDescribe("Load capacity", func() {
close(profileGathererStopCh)
wg := sync.WaitGroup{}
wg.Add(1)
framework.GatherApiserverMemoryProfile(&wg, "load")
framework.GatherMemoryProfile("kube-apiserver", "load", &wg)
wg.Wait()
// Verify latency metrics
@ -159,7 +159,7 @@ var _ = SIGDescribe("Load capacity", func() {
// Start apiserver CPU profile gatherer with frequency based on cluster size.
profileGatheringDelay := time.Duration(5+nodeCount/100) * time.Minute
profileGathererStopCh = framework.StartApiserverCPUProfileGatherer(profileGatheringDelay)
profileGathererStopCh = framework.StartCPUProfileGatherer("kube-apiserver", "load", profileGatheringDelay)
})
type Load struct {