From ae36f8ee95d23689ad2889c8b16025c999726a8f Mon Sep 17 00:00:00 2001 From: Jiaying Zhang Date: Tue, 26 Sep 2017 22:27:30 -0700 Subject: [PATCH] Extend test/e2e/scheduling/nvidia-gpus.go to track resource usage of installer and device plugin containers. To support this, exports certain functions and fields in framework/resource_usage_gatherer.go so that it can be used in any e2e test to track any specified pod resource usage with the specified probe interval and duration. --- test/e2e/framework/framework.go | 12 +- test/e2e/framework/resource_usage_gatherer.go | 109 ++++++++++-------- test/e2e/framework/util.go | 13 +++ test/e2e/scheduling/BUILD | 1 + test/e2e/scheduling/nvidia-gpus.go | 44 ++++--- 5 files changed, 111 insertions(+), 68 deletions(-) diff --git a/test/e2e/framework/framework.go b/test/e2e/framework/framework.go index 7d6661b4cbb..d75bfb93f19 100644 --- a/test/e2e/framework/framework.go +++ b/test/e2e/framework/framework.go @@ -203,13 +203,15 @@ func (f *Framework) BeforeEach() { if TestContext.GatherKubeSystemResourceUsageData != "false" && TestContext.GatherKubeSystemResourceUsageData != "none" { var err error f.gatherer, err = NewResourceUsageGatherer(f.ClientSet, ResourceGathererOptions{ - inKubemark: ProviderIs("kubemark"), - masterOnly: TestContext.GatherKubeSystemResourceUsageData == "master", - }) + InKubemark: ProviderIs("kubemark"), + MasterOnly: TestContext.GatherKubeSystemResourceUsageData == "master", + ResourceDataGatheringPeriod: 60 * time.Second, + ProbeDuration: 5 * time.Second, + }, nil) if err != nil { Logf("Error while creating NewResourceUsageGatherer: %v", err) } else { - go f.gatherer.startGatheringData() + go f.gatherer.StartGatheringData() } } @@ -319,7 +321,7 @@ func (f *Framework) AfterEach() { if TestContext.GatherKubeSystemResourceUsageData != "false" && TestContext.GatherKubeSystemResourceUsageData != "none" && f.gatherer != nil { By("Collecting resource usage data") - summary, resourceViolationError := f.gatherer.stopAndSummarize([]int{90, 99, 100}, f.AddonResourceConstraints) + summary, resourceViolationError := f.gatherer.StopAndSummarize([]int{90, 99, 100}, f.AddonResourceConstraints) defer ExpectNoError(resourceViolationError) f.TestSummaries = append(f.TestSummaries, summary) } diff --git a/test/e2e/framework/resource_usage_gatherer.go b/test/e2e/framework/resource_usage_gatherer.go index 6ed5ff2bffe..6284bd17f67 100644 --- a/test/e2e/framework/resource_usage_gatherer.go +++ b/test/e2e/framework/resource_usage_gatherer.go @@ -27,17 +27,13 @@ import ( "text/tabwriter" "time" + "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/pkg/util/system" ) -const ( - resourceDataGatheringPeriod = 60 * time.Second - probeDuration = 15 * time.Second -) - type ResourceConstraint struct { CPUConstraint float64 MemoryConstraint uint64 @@ -131,14 +127,16 @@ func leftMergeData(left, right map[int]ResourceUsagePerContainer) map[int]Resour } type resourceGatherWorker struct { - c clientset.Interface - nodeName string - wg *sync.WaitGroup - containerIDs []string - stopCh chan struct{} - dataSeries []ResourceUsagePerContainer - finished bool - inKubemark bool + c clientset.Interface + nodeName string + wg *sync.WaitGroup + containerIDs []string + stopCh chan struct{} + dataSeries []ResourceUsagePerContainer + finished bool + inKubemark bool + resourceDataGatheringPeriod time.Duration + probeDuration time.Duration } func (w *resourceGatherWorker) singleProbe() { @@ -156,13 +154,14 @@ func (w *resourceGatherWorker) singleProbe() { } } } else { - nodeUsage, err := getOneTimeResourceUsageOnNode(w.c, w.nodeName, probeDuration, func() []string { return w.containerIDs }) + nodeUsage, err := getOneTimeResourceUsageOnNode(w.c, w.nodeName, w.probeDuration, func() []string { return w.containerIDs }) if err != nil { Logf("Error while reading data from %v: %v", w.nodeName, err) return } for k, v := range nodeUsage { data[k] = v + Logf("Get container %v usage on node %v. CPUUsageInCores: %v, MemoryUsageInBytes: %v, MemoryWorkingSetInBytes: %v", k, w.nodeName, v.CPUUsageInCores, v.MemoryUsageInBytes, v.MemoryWorkingSetInBytes) } } w.dataSeries = append(w.dataSeries, data) @@ -178,7 +177,7 @@ func (w *resourceGatherWorker) gather(initialSleep time.Duration) { w.singleProbe() for { select { - case <-time.After(resourceDataGatheringPeriod): + case <-time.After(w.resourceDataGatheringPeriod): w.singleProbe() case <-w.stopCh: return @@ -189,19 +188,6 @@ func (w *resourceGatherWorker) gather(initialSleep time.Duration) { } } -func (g *containerResourceGatherer) getKubeSystemContainersResourceUsage(c clientset.Interface) { - if len(g.workers) == 0 { - return - } - delayPeriod := resourceDataGatheringPeriod / time.Duration(len(g.workers)) - delay := time.Duration(0) - for i := range g.workers { - go g.workers[i].gather(delay) - delay += delayPeriod - } - g.workerWg.Wait() -} - type containerResourceGatherer struct { client clientset.Interface stopCh chan struct{} @@ -212,11 +198,13 @@ type containerResourceGatherer struct { } type ResourceGathererOptions struct { - inKubemark bool - masterOnly bool + InKubemark bool + MasterOnly bool + ResourceDataGatheringPeriod time.Duration + ProbeDuration time.Duration } -func NewResourceUsageGatherer(c clientset.Interface, options ResourceGathererOptions) (*containerResourceGatherer, error) { +func NewResourceUsageGatherer(c clientset.Interface, options ResourceGathererOptions, pods *v1.PodList) (*containerResourceGatherer, error) { g := containerResourceGatherer{ client: c, stopCh: make(chan struct{}), @@ -224,7 +212,7 @@ func NewResourceUsageGatherer(c clientset.Interface, options ResourceGathererOpt options: options, } - if options.inKubemark { + if options.InKubemark { g.workerWg.Add(1) g.workers = append(g.workers, resourceGatherWorker{ inKubemark: true, @@ -233,12 +221,19 @@ func NewResourceUsageGatherer(c clientset.Interface, options ResourceGathererOpt finished: false, }) } else { - pods, err := c.CoreV1().Pods("kube-system").List(metav1.ListOptions{}) - if err != nil { - Logf("Error while listing Pods: %v", err) - return nil, err + // Tracks kube-system pods if no valid PodList is passed in. + var err error + if pods == nil { + pods, err = c.CoreV1().Pods("kube-system").List(metav1.ListOptions{}) + if err != nil { + Logf("Error while listing Pods: %v", err) + return nil, err + } } for _, pod := range pods.Items { + for _, container := range pod.Status.InitContainerStatuses { + g.containerIDs = append(g.containerIDs, container.Name) + } for _, container := range pod.Status.ContainerStatuses { g.containerIDs = append(g.containerIDs, container.Name) } @@ -250,18 +245,20 @@ func NewResourceUsageGatherer(c clientset.Interface, options ResourceGathererOpt } for _, node := range nodeList.Items { - if !options.masterOnly || system.IsMasterNode(node.Name) { + if !options.MasterOnly || system.IsMasterNode(node.Name) { g.workerWg.Add(1) g.workers = append(g.workers, resourceGatherWorker{ - c: c, - nodeName: node.Name, - wg: &g.workerWg, - containerIDs: g.containerIDs, - stopCh: g.stopCh, - finished: false, - inKubemark: false, + c: c, + nodeName: node.Name, + wg: &g.workerWg, + containerIDs: g.containerIDs, + stopCh: g.stopCh, + finished: false, + inKubemark: false, + resourceDataGatheringPeriod: options.ResourceDataGatheringPeriod, + probeDuration: options.ProbeDuration, }) - if options.masterOnly { + if options.MasterOnly { break } } @@ -270,12 +267,26 @@ func NewResourceUsageGatherer(c clientset.Interface, options ResourceGathererOpt return &g, nil } -// startGatheringData blocks until stopAndSummarize is called. -func (g *containerResourceGatherer) startGatheringData() { - g.getKubeSystemContainersResourceUsage(g.client) +// StartGatheringData starts a stat gathering worker blocks for each node to track, +// and blocks until StopAndSummarize is called. +func (g *containerResourceGatherer) StartGatheringData() { + if len(g.workers) == 0 { + return + } + delayPeriod := g.options.ResourceDataGatheringPeriod / time.Duration(len(g.workers)) + delay := time.Duration(0) + for i := range g.workers { + go g.workers[i].gather(delay) + delay += delayPeriod + } + g.workerWg.Wait() } -func (g *containerResourceGatherer) stopAndSummarize(percentiles []int, constraints map[string]ResourceConstraint) (*ResourceUsageSummary, error) { +// StopAndSummarize stops stat gathering workers, processes the collected stats, +// generates resource summary for the passed-in percentiles, and returns the summary. +// It returns an error if the resource usage at any percentile is beyond the +// specified resource constraints. +func (g *containerResourceGatherer) StopAndSummarize(percentiles []int, constraints map[string]ResourceConstraint) (*ResourceUsageSummary, error) { close(g.stopCh) Logf("Closed stop channel. Waiting for %v workers", len(g.workers)) finished := make(chan struct{}) diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index 328c81c4a6a..bddeeeb3637 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -2729,6 +2729,19 @@ func WaitForControlledPodsRunning(c clientset.Interface, ns, name string, kind s return nil } +// Wait up to PodListTimeout for getting pods of the specified controller name and return them. +func WaitForControlledPods(c clientset.Interface, ns, name string, kind schema.GroupKind) (pods *v1.PodList, err error) { + rtObject, err := getRuntimeObjectForKind(c, kind, ns, name) + if err != nil { + return nil, err + } + selector, err := getSelectorFromRuntimeObject(rtObject) + if err != nil { + return nil, err + } + return WaitForPodsWithLabel(c, ns, selector) +} + // Returns true if all the specified pods are scheduled, else returns false. func podsWithLabelScheduled(c clientset.Interface, ns string, label labels.Selector) (bool, error) { PodStore := testutil.NewPodStore(c, ns, label, fields.Everything()) diff --git a/test/e2e/scheduling/BUILD b/test/e2e/scheduling/BUILD index 655588b8811..84e68a08b42 100644 --- a/test/e2e/scheduling/BUILD +++ b/test/e2e/scheduling/BUILD @@ -26,6 +26,7 @@ go_library( "//pkg/api/v1/pod:go_default_library", "//pkg/apis/core:go_default_library", "//pkg/apis/core/v1/helper:go_default_library", + "//pkg/apis/extensions:go_default_library", "//pkg/quota/evaluator/core:go_default_library", "//pkg/util/system:go_default_library", "//pkg/util/version:go_default_library", diff --git a/test/e2e/scheduling/nvidia-gpus.go b/test/e2e/scheduling/nvidia-gpus.go index 526aecaa463..5deeea25f7e 100644 --- a/test/e2e/scheduling/nvidia-gpus.go +++ b/test/e2e/scheduling/nvidia-gpus.go @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/uuid" + extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/test/e2e/framework" imageutils "k8s.io/kubernetes/test/utils/image" @@ -171,20 +172,28 @@ func testNvidiaGPUsOnCOS(f *framework.Framework) { podCreationFunc = makeCudaAdditionTestPod } - // GPU drivers might have already been installed. - if !areGPUsAvailableOnAllSchedulableNodes(f) { - // Install Nvidia Drivers. - ds, err := framework.DsFromManifest(dsYamlUrl) - Expect(err).NotTo(HaveOccurred()) - ds.Namespace = f.Namespace.Name - _, err = f.ClientSet.ExtensionsV1beta1().DaemonSets(f.Namespace.Name).Create(ds) - framework.ExpectNoError(err, "failed to create daemonset") - framework.Logf("Successfully created daemonset to install Nvidia drivers. Waiting for drivers to be installed and GPUs to be available in Node Capacity...") - // Wait for Nvidia GPUs to be available on nodes - Eventually(func() bool { - return areGPUsAvailableOnAllSchedulableNodes(f) - }, driverInstallTimeout, time.Second).Should(BeTrue()) - } + // Creates the DaemonSet that installs Nvidia Drivers. + // The DaemonSet also runs nvidia device plugin for device plugin test. + ds, err := framework.DsFromManifest(dsYamlUrl) + Expect(err).NotTo(HaveOccurred()) + ds.Namespace = f.Namespace.Name + _, err = f.ClientSet.ExtensionsV1beta1().DaemonSets(f.Namespace.Name).Create(ds) + framework.ExpectNoError(err, "failed to create daemonset") + framework.Logf("Successfully created daemonset to install Nvidia drivers.") + + pods, err := framework.WaitForControlledPods(f.ClientSet, ds.Namespace, ds.Name, extensionsinternal.Kind("DaemonSet")) + framework.ExpectNoError(err, "getting pods controlled by the daemonset") + framework.Logf("Starting ResourceUsageGather for the created DaemonSet pods.") + rsgather, err := framework.NewResourceUsageGatherer(f.ClientSet, framework.ResourceGathererOptions{false, false, 2 * time.Second, 2 * time.Second}, pods) + framework.ExpectNoError(err, "creating ResourceUsageGather for the daemonset pods") + go rsgather.StartGatheringData() + + // Wait for Nvidia GPUs to be available on nodes + framework.Logf("Waiting for drivers to be installed and GPUs to be available in Node Capacity...") + Eventually(func() bool { + return areGPUsAvailableOnAllSchedulableNodes(f) + }, driverInstallTimeout, time.Second).Should(BeTrue()) + framework.Logf("Creating as many pods as there are Nvidia GPUs and have the pods run a CUDA app") podList := []*v1.Pod{} for i := int64(0); i < getGPUsAvailable(f); i++ { @@ -195,6 +204,13 @@ func testNvidiaGPUsOnCOS(f *framework.Framework) { for _, po := range podList { f.PodClient().WaitForSuccess(po.Name, 5*time.Minute) } + + framework.Logf("Stopping ResourceUsageGather") + constraints := make(map[string]framework.ResourceConstraint) + // For now, just gets summary. Can pass valid constraints in the future. + summary, err := rsgather.StopAndSummarize([]int{50, 90, 100}, constraints) + f.TestSummaries = append(f.TestSummaries, summary) + framework.ExpectNoError(err, "getting resource usage summary") } var _ = SIGDescribe("[Feature:GPU]", func() {