From 457548ef7d016c05280f4840c280cc8ee81231c0 Mon Sep 17 00:00:00 2001 From: Shyam Jeedigunta Date: Thu, 21 Jun 2018 18:13:58 +0200 Subject: [PATCH 1/2] Refactor profile-gatherer to work across all master components --- test/e2e/framework/profile_gatherer.go | 105 ++++++++++++++----------- test/e2e/scalability/density.go | 4 +- test/e2e/scalability/load.go | 4 +- 3 files changed, 64 insertions(+), 49 deletions(-) diff --git a/test/e2e/framework/profile_gatherer.go b/test/e2e/framework/profile_gatherer.go index c63795ec469..7c42a76ad0c 100644 --- a/test/e2e/framework/profile_gatherer.go +++ b/test/e2e/framework/profile_gatherer.go @@ -61,25 +61,54 @@ func checkProfileGatheringPrerequisites() error { return nil } -func gatherProfileOfKind(profileBaseName, kind string) error { +func getPortForComponent(componentName string) (int, error) { + switch componentName { + case "kube-apiserver": + return 8080, nil + case "kube-scheduler": + return 10251, nil + case "kube-controller-manager": + return 10252, nil + } + return -1, fmt.Errorf("Port for component %v unknown", componentName) +} + +// Gathers profiles from a master component through SSH. E.g usages: +// - gatherProfile("kube-apiserver", "someTest", "heap") +// - gatherProfile("kube-scheduler", "someTest", "profile") +// - gatherProfile("kube-controller-manager", "someTest", "profile?seconds=20") +// +// We don't export this method but wrappers around it (see below). +func gatherProfile(componentName, profileBaseName, profileKind string) error { + if err := checkProfileGatheringPrerequisites(); err != nil { + return fmt.Errorf("Profile gathering pre-requisite failed: %v", err) + } + profilePort, err := getPortForComponent(componentName) + if err != nil { + return fmt.Errorf("Profile gathering failed finding component port: %v", err) + } + if profileBaseName == "" { + profileBaseName = time.Now().Format(time.RFC3339) + } + // Get the profile data over SSH. - getCommand := fmt.Sprintf("curl -s localhost:8080/debug/pprof/%s", kind) + getCommand := fmt.Sprintf("curl -s localhost:%v/debug/pprof/%s", profilePort, profileKind) sshResult, err := SSH(getCommand, GetMasterHost()+":22", TestContext.Provider) if err != nil { return fmt.Errorf("Failed to execute curl command on master through SSH: %v", err) } - var profilePrefix string + profilePrefix := componentName switch { - case kind == "heap": - profilePrefix = "ApiserverMemoryProfile_" - case strings.HasPrefix(kind, "profile"): - profilePrefix = "ApiserverCPUProfile_" + case profileKind == "heap": + profilePrefix += "_MemoryProfile_" + case strings.HasPrefix(profileKind, "profile"): + profilePrefix += "_CPUProfile_" default: - return fmt.Errorf("Unknown profile kind provided: %s", kind) + return fmt.Errorf("Unknown profile kind provided: %s", profileKind) } - // Write the data to a file. + // Write the profile data to a file. rawprofilePath := path.Join(getProfilesDirectoryPath(), profilePrefix+profileBaseName+".pprof") rawprofile, err := os.Create(rawprofilePath) if err != nil { @@ -97,12 +126,12 @@ func gatherProfileOfKind(profileBaseName, kind string) error { var cmd *exec.Cmd switch { // TODO: Support other profile kinds if needed (e.g inuse_space, alloc_objects, mutex, etc) - case kind == "heap": + case profileKind == "heap": cmd = exec.Command("go", "tool", "pprof", "-pdf", "-symbolize=none", "--alloc_space", rawprofile.Name()) - case strings.HasPrefix(kind, "profile"): + case strings.HasPrefix(profileKind, "profile"): cmd = exec.Command("go", "tool", "pprof", "-pdf", "-symbolize=none", rawprofile.Name()) default: - return fmt.Errorf("Unknown profile kind provided: %s", kind) + return fmt.Errorf("Unknown profile kind provided: %s", profileKind) } outfilePath := path.Join(getProfilesDirectoryPath(), profilePrefix+profileBaseName+".pdf") outfile, err := os.Create(outfilePath) @@ -124,67 +153,53 @@ func gatherProfileOfKind(profileBaseName, kind string) error { // finish before the parent goroutine itself finishes, we accept a sync.WaitGroup // argument in these functions. Typically you would use the following pattern: // -// func TestFooBar() { +// func TestFoo() { // var wg sync.WaitGroup // wg.Add(3) -// go framework.GatherApiserverCPUProfile(&wg, "doing_foo") -// go framework.GatherApiserverMemoryProfile(&wg, "doing_foo") +// go framework.GatherCPUProfile("kube-apiserver", "before_foo", &wg) +// go framework.GatherMemoryProfile("kube-apiserver", "before_foo", &wg) // <<<< some code doing foo >>>>>> -// go framework.GatherApiserverCPUProfile(&wg, "doing_bar") -// <<<< some code doing bar >>>>>> +// go framework.GatherCPUProfile("kube-scheduler", "after_foo", &wg) // wg.Wait() // } // // If you do not wish to exercise the waiting logic, pass a nil value for the // waitgroup argument instead. However, then you would be responsible for ensuring -// that the function finishes. +// that the function finishes. There's also a polling-based gatherer utility for +// CPU profiles available below. -func GatherApiserverCPUProfile(wg *sync.WaitGroup, profileBaseName string) { - GatherApiserverCPUProfileForNSeconds(wg, profileBaseName, DefaultCPUProfileSeconds) +func GatherCPUProfile(componentName string, profileBaseName string, wg *sync.WaitGroup) { + GatherCPUProfileForSeconds(componentName, profileBaseName, DefaultCPUProfileSeconds, wg) } -func GatherApiserverCPUProfileForNSeconds(wg *sync.WaitGroup, profileBaseName string, n int) { +func GatherCPUProfileForSeconds(componentName string, profileBaseName string, seconds int, wg *sync.WaitGroup) { if wg != nil { defer wg.Done() } - if err := checkProfileGatheringPrerequisites(); err != nil { - Logf("Profile gathering pre-requisite failed: %v", err) - return - } - if profileBaseName == "" { - profileBaseName = time.Now().Format(time.RFC3339) - } - if err := gatherProfileOfKind(profileBaseName, fmt.Sprintf("profile?seconds=%v", n)); err != nil { - Logf("Failed to gather apiserver CPU profile: %v", err) + if err := gatherProfile(componentName, profileBaseName, fmt.Sprintf("profile?seconds=%v", seconds)); err != nil { + Logf("Failed to gather %v CPU profile: %v", componentName, err) } } -func GatherApiserverMemoryProfile(wg *sync.WaitGroup, profileBaseName string) { +func GatherMemoryProfile(componentName string, profileBaseName string, wg *sync.WaitGroup) { if wg != nil { defer wg.Done() } - if err := checkProfileGatheringPrerequisites(); err != nil { - Logf("Profile gathering pre-requisite failed: %v", err) - return - } - if profileBaseName == "" { - profileBaseName = time.Now().Format(time.RFC3339) - } - if err := gatherProfileOfKind(profileBaseName, "heap"); err != nil { - Logf("Failed to gather apiserver memory profile: %v", err) + if err := gatherProfile(componentName, profileBaseName, "heap"); err != nil { + Logf("Failed to gather %v memory profile: %v", componentName, err) } } -// StartApiserverCPUProfileGatherer is a polling-based gatherer of the apiserver's -// CPU profile. It takes the delay b/w consecutive gatherings as an argument and +// StartCPUProfileGatherer performs polling-based gathering of the component's CPU +// profile. It takes the interval b/w consecutive gatherings as an argument and // starts the gathering goroutine. To stop the gatherer, close the returned channel. -func StartApiserverCPUProfileGatherer(delay time.Duration) chan struct{} { +func StartCPUProfileGatherer(componentName string, profileBaseName string, interval time.Duration) chan struct{} { stopCh := make(chan struct{}) go func() { for { select { - case <-time.After(delay): - GatherApiserverCPUProfile(nil, "") + case <-time.After(interval): + GatherCPUProfile(componentName, profileBaseName+"_"+time.Now().Format(time.RFC3339), nil) case <-stopCh: return } diff --git a/test/e2e/scalability/density.go b/test/e2e/scalability/density.go index 30df63e7afe..f2e6e3d2e94 100644 --- a/test/e2e/scalability/density.go +++ b/test/e2e/scalability/density.go @@ -388,7 +388,7 @@ var _ = SIGDescribe("Density", func() { close(profileGathererStopCh) wg := sync.WaitGroup{} wg.Add(1) - framework.GatherApiserverMemoryProfile(&wg, "density") + framework.GatherMemoryProfile("kube-apiserver", "density", &wg) wg.Wait() saturationThreshold := time.Duration((totalPods / MinPodsPerSecondThroughput)) * time.Second @@ -487,7 +487,7 @@ var _ = SIGDescribe("Density", func() { // Start apiserver CPU profile gatherer with frequency based on cluster size. profileGatheringDelay := time.Duration(5+nodeCount/100) * time.Minute - profileGathererStopCh = framework.StartApiserverCPUProfileGatherer(profileGatheringDelay) + profileGathererStopCh = framework.StartCPUProfileGatherer("kube-apiserver", "density", profileGatheringDelay) }) type Density struct { diff --git a/test/e2e/scalability/load.go b/test/e2e/scalability/load.go index 5dadeed360f..cd977c628d4 100644 --- a/test/e2e/scalability/load.go +++ b/test/e2e/scalability/load.go @@ -106,7 +106,7 @@ var _ = SIGDescribe("Load capacity", func() { close(profileGathererStopCh) wg := sync.WaitGroup{} wg.Add(1) - framework.GatherApiserverMemoryProfile(&wg, "load") + framework.GatherMemoryProfile("kube-apiserver", "load", &wg) wg.Wait() // Verify latency metrics @@ -159,7 +159,7 @@ var _ = SIGDescribe("Load capacity", func() { // Start apiserver CPU profile gatherer with frequency based on cluster size. profileGatheringDelay := time.Duration(5+nodeCount/100) * time.Minute - profileGathererStopCh = framework.StartApiserverCPUProfileGatherer(profileGatheringDelay) + profileGathererStopCh = framework.StartCPUProfileGatherer("kube-apiserver", "load", profileGatheringDelay) }) type Load struct { From 0c787703f55a010ed0d2e8c03128fdc5fe3a348a Mon Sep 17 00:00:00 2001 From: Shyam Jeedigunta Date: Thu, 21 Jun 2018 20:53:19 +0200 Subject: [PATCH 2/2] Introduce scheduler CPU/Memory profile-gathering in density test --- test/e2e/scalability/density.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/test/e2e/scalability/density.go b/test/e2e/scalability/density.go index f2e6e3d2e94..b3803d983c4 100644 --- a/test/e2e/scalability/density.go +++ b/test/e2e/scalability/density.go @@ -64,6 +64,9 @@ var MaxContainerFailures = 0 // Maximum no. of missing measurements related to pod-startup that the test tolerates. var MaxMissingPodStartupMeasurements = 0 +// Number of nodes in the cluster (computed inside BeforeEach). +var nodeCount = 0 + type DensityTestConfig struct { Configs []testutils.RunObjectConfig ClientSets []clientset.Interface @@ -285,6 +288,11 @@ func runDensityTest(dtc DensityTestConfig, testPhaseDurations *timer.TestPhaseTi replicationCtrlStartupPhase := testPhaseDurations.StartPhase(300, "saturation pods creation") defer replicationCtrlStartupPhase.End() + + // Start scheduler CPU profile-gatherer before we begin cluster saturation. + profileGatheringDelay := time.Duration(1+nodeCount/100) * time.Minute + schedulerProfilingStopCh := framework.StartCPUProfileGatherer("kube-scheduler", "density", profileGatheringDelay) + // Start all replication controllers. startTime := time.Now() wg := sync.WaitGroup{} @@ -304,10 +312,16 @@ func runDensityTest(dtc DensityTestConfig, testPhaseDurations *timer.TestPhaseTi wg.Wait() startupTime := time.Since(startTime) close(logStopCh) + close(schedulerProfilingStopCh) framework.Logf("E2E startup time for %d pods: %v", dtc.PodCount, startupTime) framework.Logf("Throughput (pods/s) during cluster saturation phase: %v", float32(dtc.PodCount)/float32(startupTime/time.Second)) replicationCtrlStartupPhase.End() + // Grabbing scheduler memory profile after cluster saturation finished. + wg.Add(1) + framework.GatherMemoryProfile("kube-scheduler", "density", &wg) + wg.Wait() + printPodAllocationPhase := testPhaseDurations.StartPhase(400, "printing pod allocation") defer printPodAllocationPhase.End() // Print some data about Pod to Node allocation @@ -366,7 +380,6 @@ func cleanupDensityTest(dtc DensityTestConfig, testPhaseDurations *timer.TestPha // limits on Docker's concurrent container startup. var _ = SIGDescribe("Density", func() { var c clientset.Interface - var nodeCount int var additionalPodsPrefix string var ns string var uuid string