Extend test/e2e/scheduling/nvidia-gpus.go to track resource usage of

installer and device plugin containers.
To support this, exports certain functions and fields in
framework/resource_usage_gatherer.go so that it can be used in any
e2e test to track any specified pod resource usage with the specified
probe interval and duration.
This commit is contained in:
Jiaying Zhang 2017-09-26 22:27:30 -07:00
parent beefab8a8e
commit ae36f8ee95
5 changed files with 111 additions and 68 deletions

View File

@ -203,13 +203,15 @@ func (f *Framework) BeforeEach() {
if TestContext.GatherKubeSystemResourceUsageData != "false" && TestContext.GatherKubeSystemResourceUsageData != "none" {
var err error
f.gatherer, err = NewResourceUsageGatherer(f.ClientSet, ResourceGathererOptions{
inKubemark: ProviderIs("kubemark"),
masterOnly: TestContext.GatherKubeSystemResourceUsageData == "master",
})
InKubemark: ProviderIs("kubemark"),
MasterOnly: TestContext.GatherKubeSystemResourceUsageData == "master",
ResourceDataGatheringPeriod: 60 * time.Second,
ProbeDuration: 5 * time.Second,
}, nil)
if err != nil {
Logf("Error while creating NewResourceUsageGatherer: %v", err)
} else {
go f.gatherer.startGatheringData()
go f.gatherer.StartGatheringData()
}
}
@ -319,7 +321,7 @@ func (f *Framework) AfterEach() {
if TestContext.GatherKubeSystemResourceUsageData != "false" && TestContext.GatherKubeSystemResourceUsageData != "none" && f.gatherer != nil {
By("Collecting resource usage data")
summary, resourceViolationError := f.gatherer.stopAndSummarize([]int{90, 99, 100}, f.AddonResourceConstraints)
summary, resourceViolationError := f.gatherer.StopAndSummarize([]int{90, 99, 100}, f.AddonResourceConstraints)
defer ExpectNoError(resourceViolationError)
f.TestSummaries = append(f.TestSummaries, summary)
}

View File

@ -27,17 +27,13 @@ import (
"text/tabwriter"
"time"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/util/system"
)
const (
resourceDataGatheringPeriod = 60 * time.Second
probeDuration = 15 * time.Second
)
type ResourceConstraint struct {
CPUConstraint float64
MemoryConstraint uint64
@ -139,6 +135,8 @@ type resourceGatherWorker struct {
dataSeries []ResourceUsagePerContainer
finished bool
inKubemark bool
resourceDataGatheringPeriod time.Duration
probeDuration time.Duration
}
func (w *resourceGatherWorker) singleProbe() {
@ -156,13 +154,14 @@ func (w *resourceGatherWorker) singleProbe() {
}
}
} else {
nodeUsage, err := getOneTimeResourceUsageOnNode(w.c, w.nodeName, probeDuration, func() []string { return w.containerIDs })
nodeUsage, err := getOneTimeResourceUsageOnNode(w.c, w.nodeName, w.probeDuration, func() []string { return w.containerIDs })
if err != nil {
Logf("Error while reading data from %v: %v", w.nodeName, err)
return
}
for k, v := range nodeUsage {
data[k] = v
Logf("Get container %v usage on node %v. CPUUsageInCores: %v, MemoryUsageInBytes: %v, MemoryWorkingSetInBytes: %v", k, w.nodeName, v.CPUUsageInCores, v.MemoryUsageInBytes, v.MemoryWorkingSetInBytes)
}
}
w.dataSeries = append(w.dataSeries, data)
@ -178,7 +177,7 @@ func (w *resourceGatherWorker) gather(initialSleep time.Duration) {
w.singleProbe()
for {
select {
case <-time.After(resourceDataGatheringPeriod):
case <-time.After(w.resourceDataGatheringPeriod):
w.singleProbe()
case <-w.stopCh:
return
@ -189,19 +188,6 @@ func (w *resourceGatherWorker) gather(initialSleep time.Duration) {
}
}
func (g *containerResourceGatherer) getKubeSystemContainersResourceUsage(c clientset.Interface) {
if len(g.workers) == 0 {
return
}
delayPeriod := resourceDataGatheringPeriod / time.Duration(len(g.workers))
delay := time.Duration(0)
for i := range g.workers {
go g.workers[i].gather(delay)
delay += delayPeriod
}
g.workerWg.Wait()
}
type containerResourceGatherer struct {
client clientset.Interface
stopCh chan struct{}
@ -212,11 +198,13 @@ type containerResourceGatherer struct {
}
type ResourceGathererOptions struct {
inKubemark bool
masterOnly bool
InKubemark bool
MasterOnly bool
ResourceDataGatheringPeriod time.Duration
ProbeDuration time.Duration
}
func NewResourceUsageGatherer(c clientset.Interface, options ResourceGathererOptions) (*containerResourceGatherer, error) {
func NewResourceUsageGatherer(c clientset.Interface, options ResourceGathererOptions, pods *v1.PodList) (*containerResourceGatherer, error) {
g := containerResourceGatherer{
client: c,
stopCh: make(chan struct{}),
@ -224,7 +212,7 @@ func NewResourceUsageGatherer(c clientset.Interface, options ResourceGathererOpt
options: options,
}
if options.inKubemark {
if options.InKubemark {
g.workerWg.Add(1)
g.workers = append(g.workers, resourceGatherWorker{
inKubemark: true,
@ -233,12 +221,19 @@ func NewResourceUsageGatherer(c clientset.Interface, options ResourceGathererOpt
finished: false,
})
} else {
pods, err := c.CoreV1().Pods("kube-system").List(metav1.ListOptions{})
// Tracks kube-system pods if no valid PodList is passed in.
var err error
if pods == nil {
pods, err = c.CoreV1().Pods("kube-system").List(metav1.ListOptions{})
if err != nil {
Logf("Error while listing Pods: %v", err)
return nil, err
}
}
for _, pod := range pods.Items {
for _, container := range pod.Status.InitContainerStatuses {
g.containerIDs = append(g.containerIDs, container.Name)
}
for _, container := range pod.Status.ContainerStatuses {
g.containerIDs = append(g.containerIDs, container.Name)
}
@ -250,7 +245,7 @@ func NewResourceUsageGatherer(c clientset.Interface, options ResourceGathererOpt
}
for _, node := range nodeList.Items {
if !options.masterOnly || system.IsMasterNode(node.Name) {
if !options.MasterOnly || system.IsMasterNode(node.Name) {
g.workerWg.Add(1)
g.workers = append(g.workers, resourceGatherWorker{
c: c,
@ -260,8 +255,10 @@ func NewResourceUsageGatherer(c clientset.Interface, options ResourceGathererOpt
stopCh: g.stopCh,
finished: false,
inKubemark: false,
resourceDataGatheringPeriod: options.ResourceDataGatheringPeriod,
probeDuration: options.ProbeDuration,
})
if options.masterOnly {
if options.MasterOnly {
break
}
}
@ -270,12 +267,26 @@ func NewResourceUsageGatherer(c clientset.Interface, options ResourceGathererOpt
return &g, nil
}
// startGatheringData blocks until stopAndSummarize is called.
func (g *containerResourceGatherer) startGatheringData() {
g.getKubeSystemContainersResourceUsage(g.client)
// StartGatheringData starts a stat gathering worker blocks for each node to track,
// and blocks until StopAndSummarize is called.
func (g *containerResourceGatherer) StartGatheringData() {
if len(g.workers) == 0 {
return
}
delayPeriod := g.options.ResourceDataGatheringPeriod / time.Duration(len(g.workers))
delay := time.Duration(0)
for i := range g.workers {
go g.workers[i].gather(delay)
delay += delayPeriod
}
g.workerWg.Wait()
}
func (g *containerResourceGatherer) stopAndSummarize(percentiles []int, constraints map[string]ResourceConstraint) (*ResourceUsageSummary, error) {
// StopAndSummarize stops stat gathering workers, processes the collected stats,
// generates resource summary for the passed-in percentiles, and returns the summary.
// It returns an error if the resource usage at any percentile is beyond the
// specified resource constraints.
func (g *containerResourceGatherer) StopAndSummarize(percentiles []int, constraints map[string]ResourceConstraint) (*ResourceUsageSummary, error) {
close(g.stopCh)
Logf("Closed stop channel. Waiting for %v workers", len(g.workers))
finished := make(chan struct{})

View File

@ -2729,6 +2729,19 @@ func WaitForControlledPodsRunning(c clientset.Interface, ns, name string, kind s
return nil
}
// Wait up to PodListTimeout for getting pods of the specified controller name and return them.
func WaitForControlledPods(c clientset.Interface, ns, name string, kind schema.GroupKind) (pods *v1.PodList, err error) {
rtObject, err := getRuntimeObjectForKind(c, kind, ns, name)
if err != nil {
return nil, err
}
selector, err := getSelectorFromRuntimeObject(rtObject)
if err != nil {
return nil, err
}
return WaitForPodsWithLabel(c, ns, selector)
}
// Returns true if all the specified pods are scheduled, else returns false.
func podsWithLabelScheduled(c clientset.Interface, ns string, label labels.Selector) (bool, error) {
PodStore := testutil.NewPodStore(c, ns, label, fields.Everything())

View File

@ -26,6 +26,7 @@ go_library(
"//pkg/api/v1/pod:go_default_library",
"//pkg/apis/core:go_default_library",
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/apis/extensions:go_default_library",
"//pkg/quota/evaluator/core:go_default_library",
"//pkg/util/system:go_default_library",
"//pkg/util/version:go_default_library",

View File

@ -24,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"
extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/test/e2e/framework"
imageutils "k8s.io/kubernetes/test/utils/image"
@ -171,20 +172,28 @@ func testNvidiaGPUsOnCOS(f *framework.Framework) {
podCreationFunc = makeCudaAdditionTestPod
}
// GPU drivers might have already been installed.
if !areGPUsAvailableOnAllSchedulableNodes(f) {
// Install Nvidia Drivers.
// Creates the DaemonSet that installs Nvidia Drivers.
// The DaemonSet also runs nvidia device plugin for device plugin test.
ds, err := framework.DsFromManifest(dsYamlUrl)
Expect(err).NotTo(HaveOccurred())
ds.Namespace = f.Namespace.Name
_, err = f.ClientSet.ExtensionsV1beta1().DaemonSets(f.Namespace.Name).Create(ds)
framework.ExpectNoError(err, "failed to create daemonset")
framework.Logf("Successfully created daemonset to install Nvidia drivers. Waiting for drivers to be installed and GPUs to be available in Node Capacity...")
framework.Logf("Successfully created daemonset to install Nvidia drivers.")
pods, err := framework.WaitForControlledPods(f.ClientSet, ds.Namespace, ds.Name, extensionsinternal.Kind("DaemonSet"))
framework.ExpectNoError(err, "getting pods controlled by the daemonset")
framework.Logf("Starting ResourceUsageGather for the created DaemonSet pods.")
rsgather, err := framework.NewResourceUsageGatherer(f.ClientSet, framework.ResourceGathererOptions{false, false, 2 * time.Second, 2 * time.Second}, pods)
framework.ExpectNoError(err, "creating ResourceUsageGather for the daemonset pods")
go rsgather.StartGatheringData()
// Wait for Nvidia GPUs to be available on nodes
framework.Logf("Waiting for drivers to be installed and GPUs to be available in Node Capacity...")
Eventually(func() bool {
return areGPUsAvailableOnAllSchedulableNodes(f)
}, driverInstallTimeout, time.Second).Should(BeTrue())
}
framework.Logf("Creating as many pods as there are Nvidia GPUs and have the pods run a CUDA app")
podList := []*v1.Pod{}
for i := int64(0); i < getGPUsAvailable(f); i++ {
@ -195,6 +204,13 @@ func testNvidiaGPUsOnCOS(f *framework.Framework) {
for _, po := range podList {
f.PodClient().WaitForSuccess(po.Name, 5*time.Minute)
}
framework.Logf("Stopping ResourceUsageGather")
constraints := make(map[string]framework.ResourceConstraint)
// For now, just gets summary. Can pass valid constraints in the future.
summary, err := rsgather.StopAndSummarize([]int{50, 90, 100}, constraints)
f.TestSummaries = append(f.TestSummaries, summary)
framework.ExpectNoError(err, "getting resource usage summary")
}
var _ = SIGDescribe("[Feature:GPU]", func() {