Merge pull request #53541 from jiayingz/e2e-stats

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Extend test/e2e/scheduling/nvidia-gpus.go to track resource usage of

installer and device plugin containers.
To support this, exports certain functions and fields in
framework/resource_usage_gatherer.go so that it can be used in any
e2e test to track any specified pod resource usage with the specified
probe interval and duration.



**What this PR does / why we need it**:
We need to quantify the resource usage of the device plugin DaemonSet to make sure it can run reliably on nodes with GPUs.
We also want to measure gpu driver installer resource usage to track any unexpected resource consumption during driver installation.
For the later part, see a related issue https://github.com/kubernetes/features/issues/368.

Example resource summary output:
Oct  6 12:35:07.289: INFO: Printing summary: ResourceUsageSummary
Oct  6 12:35:07.289: INFO: ResourceUsageSummary JSON
{
  "100": [
    {
      "Name": "nvidia-device-plugin-6kqxp/nvidia-device-plugin",
      "Cpu": 0.000507167,
      "Mem": 2134016
    },
    {
      "Name": "nvidia-device-plugin-6kqxp/nvidia-driver-installer",
      "Cpu": 1.915508718,
      "Mem": 663330816
    },
    {
      "Name": "nvidia-device-plugin-l28zc/nvidia-device-plugin",
      "Cpu": 0.000836256,
      "Mem": 2211840
    },
    {
      "Name": "nvidia-device-plugin-l28zc/nvidia-driver-installer",
      "Cpu": 1.916886293,
      "Mem": 691449856
    },
    {
      "Name": "nvidia-device-plugin-xb4vh/nvidia-device-plugin",
      "Cpu": 0.000515103,
      "Mem": 2265088
    },
    {
      "Name": "nvidia-device-plugin-xb4vh/nvidia-driver-installer",
      "Cpu": 1.909435982,
      "Mem": 832430080
    }
  ],
  "50": [
    {
...

**Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes #

**Special notes for your reviewer**:

**Release note**:

```release-note
```
This commit is contained in:
Kubernetes Submit Queue 2017-11-13 21:51:16 -08:00 committed by GitHub
commit 710523ed7d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 111 additions and 68 deletions

View File

@ -203,13 +203,15 @@ func (f *Framework) BeforeEach() {
if TestContext.GatherKubeSystemResourceUsageData != "false" && TestContext.GatherKubeSystemResourceUsageData != "none" { if TestContext.GatherKubeSystemResourceUsageData != "false" && TestContext.GatherKubeSystemResourceUsageData != "none" {
var err error var err error
f.gatherer, err = NewResourceUsageGatherer(f.ClientSet, ResourceGathererOptions{ f.gatherer, err = NewResourceUsageGatherer(f.ClientSet, ResourceGathererOptions{
inKubemark: ProviderIs("kubemark"), InKubemark: ProviderIs("kubemark"),
masterOnly: TestContext.GatherKubeSystemResourceUsageData == "master", MasterOnly: TestContext.GatherKubeSystemResourceUsageData == "master",
}) ResourceDataGatheringPeriod: 60 * time.Second,
ProbeDuration: 5 * time.Second,
}, nil)
if err != nil { if err != nil {
Logf("Error while creating NewResourceUsageGatherer: %v", err) Logf("Error while creating NewResourceUsageGatherer: %v", err)
} else { } else {
go f.gatherer.startGatheringData() go f.gatherer.StartGatheringData()
} }
} }
@ -319,7 +321,7 @@ func (f *Framework) AfterEach() {
if TestContext.GatherKubeSystemResourceUsageData != "false" && TestContext.GatherKubeSystemResourceUsageData != "none" && f.gatherer != nil { if TestContext.GatherKubeSystemResourceUsageData != "false" && TestContext.GatherKubeSystemResourceUsageData != "none" && f.gatherer != nil {
By("Collecting resource usage data") By("Collecting resource usage data")
summary, resourceViolationError := f.gatherer.stopAndSummarize([]int{90, 99, 100}, f.AddonResourceConstraints) summary, resourceViolationError := f.gatherer.StopAndSummarize([]int{90, 99, 100}, f.AddonResourceConstraints)
defer ExpectNoError(resourceViolationError) defer ExpectNoError(resourceViolationError)
f.TestSummaries = append(f.TestSummaries, summary) f.TestSummaries = append(f.TestSummaries, summary)
} }

View File

@ -27,17 +27,13 @@ import (
"text/tabwriter" "text/tabwriter"
"time" "time"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/util/system" "k8s.io/kubernetes/pkg/util/system"
) )
const (
resourceDataGatheringPeriod = 60 * time.Second
probeDuration = 15 * time.Second
)
type ResourceConstraint struct { type ResourceConstraint struct {
CPUConstraint float64 CPUConstraint float64
MemoryConstraint uint64 MemoryConstraint uint64
@ -139,6 +135,8 @@ type resourceGatherWorker struct {
dataSeries []ResourceUsagePerContainer dataSeries []ResourceUsagePerContainer
finished bool finished bool
inKubemark bool inKubemark bool
resourceDataGatheringPeriod time.Duration
probeDuration time.Duration
} }
func (w *resourceGatherWorker) singleProbe() { func (w *resourceGatherWorker) singleProbe() {
@ -156,13 +154,14 @@ func (w *resourceGatherWorker) singleProbe() {
} }
} }
} else { } else {
nodeUsage, err := getOneTimeResourceUsageOnNode(w.c, w.nodeName, probeDuration, func() []string { return w.containerIDs }) nodeUsage, err := getOneTimeResourceUsageOnNode(w.c, w.nodeName, w.probeDuration, func() []string { return w.containerIDs })
if err != nil { if err != nil {
Logf("Error while reading data from %v: %v", w.nodeName, err) Logf("Error while reading data from %v: %v", w.nodeName, err)
return return
} }
for k, v := range nodeUsage { for k, v := range nodeUsage {
data[k] = v data[k] = v
Logf("Get container %v usage on node %v. CPUUsageInCores: %v, MemoryUsageInBytes: %v, MemoryWorkingSetInBytes: %v", k, w.nodeName, v.CPUUsageInCores, v.MemoryUsageInBytes, v.MemoryWorkingSetInBytes)
} }
} }
w.dataSeries = append(w.dataSeries, data) w.dataSeries = append(w.dataSeries, data)
@ -178,7 +177,7 @@ func (w *resourceGatherWorker) gather(initialSleep time.Duration) {
w.singleProbe() w.singleProbe()
for { for {
select { select {
case <-time.After(resourceDataGatheringPeriod): case <-time.After(w.resourceDataGatheringPeriod):
w.singleProbe() w.singleProbe()
case <-w.stopCh: case <-w.stopCh:
return return
@ -189,19 +188,6 @@ func (w *resourceGatherWorker) gather(initialSleep time.Duration) {
} }
} }
func (g *containerResourceGatherer) getKubeSystemContainersResourceUsage(c clientset.Interface) {
if len(g.workers) == 0 {
return
}
delayPeriod := resourceDataGatheringPeriod / time.Duration(len(g.workers))
delay := time.Duration(0)
for i := range g.workers {
go g.workers[i].gather(delay)
delay += delayPeriod
}
g.workerWg.Wait()
}
type containerResourceGatherer struct { type containerResourceGatherer struct {
client clientset.Interface client clientset.Interface
stopCh chan struct{} stopCh chan struct{}
@ -212,11 +198,13 @@ type containerResourceGatherer struct {
} }
type ResourceGathererOptions struct { type ResourceGathererOptions struct {
inKubemark bool InKubemark bool
masterOnly bool MasterOnly bool
ResourceDataGatheringPeriod time.Duration
ProbeDuration time.Duration
} }
func NewResourceUsageGatherer(c clientset.Interface, options ResourceGathererOptions) (*containerResourceGatherer, error) { func NewResourceUsageGatherer(c clientset.Interface, options ResourceGathererOptions, pods *v1.PodList) (*containerResourceGatherer, error) {
g := containerResourceGatherer{ g := containerResourceGatherer{
client: c, client: c,
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
@ -224,7 +212,7 @@ func NewResourceUsageGatherer(c clientset.Interface, options ResourceGathererOpt
options: options, options: options,
} }
if options.inKubemark { if options.InKubemark {
g.workerWg.Add(1) g.workerWg.Add(1)
g.workers = append(g.workers, resourceGatherWorker{ g.workers = append(g.workers, resourceGatherWorker{
inKubemark: true, inKubemark: true,
@ -233,12 +221,19 @@ func NewResourceUsageGatherer(c clientset.Interface, options ResourceGathererOpt
finished: false, finished: false,
}) })
} else { } else {
pods, err := c.CoreV1().Pods("kube-system").List(metav1.ListOptions{}) // Tracks kube-system pods if no valid PodList is passed in.
var err error
if pods == nil {
pods, err = c.CoreV1().Pods("kube-system").List(metav1.ListOptions{})
if err != nil { if err != nil {
Logf("Error while listing Pods: %v", err) Logf("Error while listing Pods: %v", err)
return nil, err return nil, err
} }
}
for _, pod := range pods.Items { for _, pod := range pods.Items {
for _, container := range pod.Status.InitContainerStatuses {
g.containerIDs = append(g.containerIDs, container.Name)
}
for _, container := range pod.Status.ContainerStatuses { for _, container := range pod.Status.ContainerStatuses {
g.containerIDs = append(g.containerIDs, container.Name) g.containerIDs = append(g.containerIDs, container.Name)
} }
@ -250,7 +245,7 @@ func NewResourceUsageGatherer(c clientset.Interface, options ResourceGathererOpt
} }
for _, node := range nodeList.Items { for _, node := range nodeList.Items {
if !options.masterOnly || system.IsMasterNode(node.Name) { if !options.MasterOnly || system.IsMasterNode(node.Name) {
g.workerWg.Add(1) g.workerWg.Add(1)
g.workers = append(g.workers, resourceGatherWorker{ g.workers = append(g.workers, resourceGatherWorker{
c: c, c: c,
@ -260,8 +255,10 @@ func NewResourceUsageGatherer(c clientset.Interface, options ResourceGathererOpt
stopCh: g.stopCh, stopCh: g.stopCh,
finished: false, finished: false,
inKubemark: false, inKubemark: false,
resourceDataGatheringPeriod: options.ResourceDataGatheringPeriod,
probeDuration: options.ProbeDuration,
}) })
if options.masterOnly { if options.MasterOnly {
break break
} }
} }
@ -270,12 +267,26 @@ func NewResourceUsageGatherer(c clientset.Interface, options ResourceGathererOpt
return &g, nil return &g, nil
} }
// startGatheringData blocks until stopAndSummarize is called. // StartGatheringData starts a stat gathering worker blocks for each node to track,
func (g *containerResourceGatherer) startGatheringData() { // and blocks until StopAndSummarize is called.
g.getKubeSystemContainersResourceUsage(g.client) func (g *containerResourceGatherer) StartGatheringData() {
if len(g.workers) == 0 {
return
}
delayPeriod := g.options.ResourceDataGatheringPeriod / time.Duration(len(g.workers))
delay := time.Duration(0)
for i := range g.workers {
go g.workers[i].gather(delay)
delay += delayPeriod
}
g.workerWg.Wait()
} }
func (g *containerResourceGatherer) stopAndSummarize(percentiles []int, constraints map[string]ResourceConstraint) (*ResourceUsageSummary, error) { // StopAndSummarize stops stat gathering workers, processes the collected stats,
// generates resource summary for the passed-in percentiles, and returns the summary.
// It returns an error if the resource usage at any percentile is beyond the
// specified resource constraints.
func (g *containerResourceGatherer) StopAndSummarize(percentiles []int, constraints map[string]ResourceConstraint) (*ResourceUsageSummary, error) {
close(g.stopCh) close(g.stopCh)
Logf("Closed stop channel. Waiting for %v workers", len(g.workers)) Logf("Closed stop channel. Waiting for %v workers", len(g.workers))
finished := make(chan struct{}) finished := make(chan struct{})

View File

@ -2729,6 +2729,19 @@ func WaitForControlledPodsRunning(c clientset.Interface, ns, name string, kind s
return nil return nil
} }
// Wait up to PodListTimeout for getting pods of the specified controller name and return them.
func WaitForControlledPods(c clientset.Interface, ns, name string, kind schema.GroupKind) (pods *v1.PodList, err error) {
rtObject, err := getRuntimeObjectForKind(c, kind, ns, name)
if err != nil {
return nil, err
}
selector, err := getSelectorFromRuntimeObject(rtObject)
if err != nil {
return nil, err
}
return WaitForPodsWithLabel(c, ns, selector)
}
// Returns true if all the specified pods are scheduled, else returns false. // Returns true if all the specified pods are scheduled, else returns false.
func podsWithLabelScheduled(c clientset.Interface, ns string, label labels.Selector) (bool, error) { func podsWithLabelScheduled(c clientset.Interface, ns string, label labels.Selector) (bool, error) {
PodStore := testutil.NewPodStore(c, ns, label, fields.Everything()) PodStore := testutil.NewPodStore(c, ns, label, fields.Everything())

View File

@ -26,6 +26,7 @@ go_library(
"//pkg/api/v1/pod:go_default_library", "//pkg/api/v1/pod:go_default_library",
"//pkg/apis/core:go_default_library", "//pkg/apis/core:go_default_library",
"//pkg/apis/core/v1/helper:go_default_library", "//pkg/apis/core/v1/helper:go_default_library",
"//pkg/apis/extensions:go_default_library",
"//pkg/quota/evaluator/core:go_default_library", "//pkg/quota/evaluator/core:go_default_library",
"//pkg/util/system:go_default_library", "//pkg/util/system:go_default_library",
"//pkg/util/version:go_default_library", "//pkg/util/version:go_default_library",

View File

@ -24,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/uuid"
extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
imageutils "k8s.io/kubernetes/test/utils/image" imageutils "k8s.io/kubernetes/test/utils/image"
@ -171,20 +172,28 @@ func testNvidiaGPUsOnCOS(f *framework.Framework) {
podCreationFunc = makeCudaAdditionTestPod podCreationFunc = makeCudaAdditionTestPod
} }
// GPU drivers might have already been installed. // Creates the DaemonSet that installs Nvidia Drivers.
if !areGPUsAvailableOnAllSchedulableNodes(f) { // The DaemonSet also runs nvidia device plugin for device plugin test.
// Install Nvidia Drivers.
ds, err := framework.DsFromManifest(dsYamlUrl) ds, err := framework.DsFromManifest(dsYamlUrl)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
ds.Namespace = f.Namespace.Name ds.Namespace = f.Namespace.Name
_, err = f.ClientSet.ExtensionsV1beta1().DaemonSets(f.Namespace.Name).Create(ds) _, err = f.ClientSet.ExtensionsV1beta1().DaemonSets(f.Namespace.Name).Create(ds)
framework.ExpectNoError(err, "failed to create daemonset") framework.ExpectNoError(err, "failed to create daemonset")
framework.Logf("Successfully created daemonset to install Nvidia drivers. Waiting for drivers to be installed and GPUs to be available in Node Capacity...") framework.Logf("Successfully created daemonset to install Nvidia drivers.")
pods, err := framework.WaitForControlledPods(f.ClientSet, ds.Namespace, ds.Name, extensionsinternal.Kind("DaemonSet"))
framework.ExpectNoError(err, "getting pods controlled by the daemonset")
framework.Logf("Starting ResourceUsageGather for the created DaemonSet pods.")
rsgather, err := framework.NewResourceUsageGatherer(f.ClientSet, framework.ResourceGathererOptions{false, false, 2 * time.Second, 2 * time.Second}, pods)
framework.ExpectNoError(err, "creating ResourceUsageGather for the daemonset pods")
go rsgather.StartGatheringData()
// Wait for Nvidia GPUs to be available on nodes // Wait for Nvidia GPUs to be available on nodes
framework.Logf("Waiting for drivers to be installed and GPUs to be available in Node Capacity...")
Eventually(func() bool { Eventually(func() bool {
return areGPUsAvailableOnAllSchedulableNodes(f) return areGPUsAvailableOnAllSchedulableNodes(f)
}, driverInstallTimeout, time.Second).Should(BeTrue()) }, driverInstallTimeout, time.Second).Should(BeTrue())
}
framework.Logf("Creating as many pods as there are Nvidia GPUs and have the pods run a CUDA app") framework.Logf("Creating as many pods as there are Nvidia GPUs and have the pods run a CUDA app")
podList := []*v1.Pod{} podList := []*v1.Pod{}
for i := int64(0); i < getGPUsAvailable(f); i++ { for i := int64(0); i < getGPUsAvailable(f); i++ {
@ -195,6 +204,13 @@ func testNvidiaGPUsOnCOS(f *framework.Framework) {
for _, po := range podList { for _, po := range podList {
f.PodClient().WaitForSuccess(po.Name, 5*time.Minute) f.PodClient().WaitForSuccess(po.Name, 5*time.Minute)
} }
framework.Logf("Stopping ResourceUsageGather")
constraints := make(map[string]framework.ResourceConstraint)
// For now, just gets summary. Can pass valid constraints in the future.
summary, err := rsgather.StopAndSummarize([]int{50, 90, 100}, constraints)
f.TestSummaries = append(f.TestSummaries, summary)
framework.ExpectNoError(err, "getting resource usage summary")
} }
var _ = SIGDescribe("[Feature:GPU]", func() { var _ = SIGDescribe("[Feature:GPU]", func() {