Add e2e tests for GPU monitoring.

This commit is contained in:
Rohit Agarwal 2018-01-26 14:30:30 -08:00
parent 6ab29c3034
commit d191c57cad
5 changed files with 167 additions and 19 deletions

View File

@ -80,7 +80,7 @@ type Framework struct {
NamespaceDeletionTimeout time.Duration
SkipPrivilegedPSPBinding bool // Whether to skip creating a binding to the privileged PSP in the test namespace
gatherer *containerResourceGatherer
gatherer *ContainerResourceGatherer
// Constraints that passed to a check which is executed after data is gathered to
// see if 99% of results are within acceptable bounds. It has to be injected in the test,
// as expectations vary greatly. Constraints are grouped by the container names.

View File

@ -191,7 +191,7 @@ func (w *resourceGatherWorker) gather(initialSleep time.Duration) {
}
}
type containerResourceGatherer struct {
type ContainerResourceGatherer struct {
client clientset.Interface
stopCh chan struct{}
workers []resourceGatherWorker
@ -208,8 +208,8 @@ type ResourceGathererOptions struct {
PrintVerboseLogs bool
}
func NewResourceUsageGatherer(c clientset.Interface, options ResourceGathererOptions, pods *v1.PodList) (*containerResourceGatherer, error) {
g := containerResourceGatherer{
func NewResourceUsageGatherer(c clientset.Interface, options ResourceGathererOptions, pods *v1.PodList) (*ContainerResourceGatherer, error) {
g := ContainerResourceGatherer{
client: c,
stopCh: make(chan struct{}),
containerIDs: make([]string, 0),
@ -277,7 +277,7 @@ func NewResourceUsageGatherer(c clientset.Interface, options ResourceGathererOpt
// StartGatheringData starts a stat gathering worker blocks for each node to track,
// and blocks until StopAndSummarize is called.
func (g *containerResourceGatherer) StartGatheringData() {
func (g *ContainerResourceGatherer) StartGatheringData() {
if len(g.workers) == 0 {
return
}
@ -294,7 +294,7 @@ func (g *containerResourceGatherer) StartGatheringData() {
// generates resource summary for the passed-in percentiles, and returns the summary.
// It returns an error if the resource usage at any percentile is beyond the
// specified resource constraints.
func (g *containerResourceGatherer) StopAndSummarize(percentiles []int, constraints map[string]ResourceConstraint) (*ResourceUsageSummary, error) {
func (g *ContainerResourceGatherer) StopAndSummarize(percentiles []int, constraints map[string]ResourceConstraint) (*ResourceUsageSummary, error) {
close(g.stopCh)
Logf("Closed stop channel. Waiting for %v workers", len(g.workers))
finished := make(chan struct{})

View File

@ -8,6 +8,7 @@ load(
go_library(
name = "go_default_library",
srcs = [
"accelerator.go",
"cadvisor.go",
"custom_metrics_deployments.go",
"custom_metrics_stackdriver.go",
@ -21,6 +22,8 @@ go_library(
"//test/e2e/framework:go_default_library",
"//test/e2e/framework/metrics:go_default_library",
"//test/e2e/instrumentation/common:go_default_library",
"//test/e2e/scheduling:go_default_library",
"//test/utils/image:go_default_library",
"//vendor/github.com/influxdata/influxdb/client/v2:go_default_library",
"//vendor/github.com/onsi/ginkgo:go_default_library",
"//vendor/github.com/onsi/gomega:go_default_library",
@ -29,6 +32,7 @@ go_library(
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/api/rbac/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",

View File

@ -0,0 +1,134 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package monitoring
import (
"context"
"os"
"time"
. "github.com/onsi/ginkgo"
"golang.org/x/oauth2/google"
gcm "google.golang.org/api/monitoring/v3"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/test/e2e/framework"
instrumentation "k8s.io/kubernetes/test/e2e/instrumentation/common"
"k8s.io/kubernetes/test/e2e/scheduling"
"k8s.io/kubernetes/test/utils/image"
)
// Stackdriver container accelerator metrics, as described here:
// https://cloud.google.com/monitoring/api/metrics_gcp#gcp-container
var acceleratorMetrics = []string{
"accelerator/duty_cycle",
"accelerator/memory_total",
"accelerator/memory_used",
}
var _ = instrumentation.SIGDescribe("Stackdriver Monitoring", func() {
BeforeEach(func() {
framework.SkipUnlessProviderIs("gce", "gke")
})
f := framework.NewDefaultFramework("stackdriver-monitoring")
It("should have accelerator metrics [Feature:StackdriverAcceleratorMonitoring]", func() {
testStackdriverAcceleratorMonitoring(f)
})
})
func testStackdriverAcceleratorMonitoring(f *framework.Framework) {
projectId := framework.TestContext.CloudConfig.ProjectID
ctx := context.Background()
client, err := google.DefaultClient(ctx, gcm.CloudPlatformScope)
gcmService, err := gcm.New(client)
framework.ExpectNoError(err)
// set this env var if accessing Stackdriver test endpoint (default is prod):
// $ export STACKDRIVER_API_ENDPOINT_OVERRIDE=https://test-monitoring.sandbox.googleapis.com/
basePathOverride := os.Getenv("STACKDRIVER_API_ENDPOINT_OVERRIDE")
if basePathOverride != "" {
gcmService.BasePath = basePathOverride
}
scheduling.SetupNVIDIAGPUNode(f, false)
// TODO: remove this after cAdvisor race is fixed.
time.Sleep(time.Minute)
f.PodClient().Create(&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: rcName,
},
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyNever,
Containers: []v1.Container{
{
Name: rcName,
Image: image.GetE2EImage(image.CudaVectorAdd),
Command: []string{"/bin/sh", "-c"},
Args: []string{"nvidia-smi && sleep infinity"},
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
framework.NVIDIAGPUResourceName: *resource.NewQuantity(1, resource.DecimalSI),
},
},
},
},
},
})
metricsMap := map[string]bool{}
pollingFunction := checkForAcceleratorMetrics(projectId, gcmService, time.Now(), metricsMap)
err = wait.Poll(pollFrequency, pollTimeout, pollingFunction)
if err != nil {
framework.Logf("Missing metrics: %+v\n", metricsMap)
}
framework.ExpectNoError(err)
}
func checkForAcceleratorMetrics(projectId string, gcmService *gcm.Service, start time.Time, metricsMap map[string]bool) func() (bool, error) {
return func() (bool, error) {
counter := 0
for _, metric := range acceleratorMetrics {
metricsMap[metric] = false
}
for _, metric := range acceleratorMetrics {
// TODO: check only for metrics from this cluster
ts, err := fetchTimeSeries(projectId, gcmService, metric, start, time.Now())
framework.ExpectNoError(err)
if len(ts) > 0 {
counter = counter + 1
metricsMap[metric] = true
framework.Logf("Received %v timeseries for metric %v", len(ts), metric)
} else {
framework.Logf("No timeseries for metric %v", metric)
}
}
if counter < 3 {
return false, nil
}
return true, nil
}
}

View File

@ -153,7 +153,8 @@ func getGPUsAvailable(f *framework.Framework) int64 {
return gpusAvailable
}
func testNvidiaGPUsOnCOS(f *framework.Framework) {
func SetupNVIDIAGPUNode(f *framework.Framework, setupResourceGatherer bool) *framework.ContainerResourceGatherer {
// Skip the test if the base image is not COS.
// TODO: Add support for other base images.
// CUDA apps require host mounts which is not portable across base images (yet).
@ -163,7 +164,11 @@ func testNvidiaGPUsOnCOS(f *framework.Framework) {
}
framework.Logf("Cluster is running on COS. Proceeding with test")
if f.BaseName == "device-plugin-gpus" {
if f.BaseName == "gpus" {
dsYamlUrl = "https://raw.githubusercontent.com/ContainerEngine/accelerators/master/cos-nvidia-gpu-installer/daemonset.yaml"
gpuResourceName = v1.ResourceNvidiaGPU
podCreationFunc = makeCudaAdditionTestPod
} else {
dsYamlUrlFromEnv := os.Getenv("NVIDIA_DRIVER_INSTALLER_DAEMONSET")
if dsYamlUrlFromEnv != "" {
dsYamlUrl = dsYamlUrlFromEnv
@ -172,33 +177,33 @@ func testNvidiaGPUsOnCOS(f *framework.Framework) {
}
gpuResourceName = framework.NVIDIAGPUResourceName
podCreationFunc = makeCudaAdditionDevicePluginTestPod
} else {
dsYamlUrl = "https://raw.githubusercontent.com/ContainerEngine/accelerators/master/cos-nvidia-gpu-installer/daemonset.yaml"
gpuResourceName = v1.ResourceNvidiaGPU
podCreationFunc = makeCudaAdditionTestPod
}
framework.Logf("Using %v", dsYamlUrl)
// Creates the DaemonSet that installs Nvidia Drivers.
// The DaemonSet also runs nvidia device plugin for device plugin test.
ds, err := framework.DsFromManifest(dsYamlUrl)
Expect(err).NotTo(HaveOccurred())
ds.Namespace = f.Namespace.Name
_, err = f.ClientSet.ExtensionsV1beta1().DaemonSets(f.Namespace.Name).Create(ds)
framework.ExpectNoError(err, "failed to create daemonset")
framework.ExpectNoError(err, "failed to create nvidia-driver-installer daemonset")
framework.Logf("Successfully created daemonset to install Nvidia drivers.")
pods, err := framework.WaitForControlledPods(f.ClientSet, ds.Namespace, ds.Name, extensionsinternal.Kind("DaemonSet"))
framework.ExpectNoError(err, "getting pods controlled by the daemonset")
framework.ExpectNoError(err, "failed to get pods controlled by the nvidia-driver-installer daemonset")
devicepluginPods, err := framework.WaitForControlledPods(f.ClientSet, "kube-system", "nvidia-gpu-device-plugin", extensionsinternal.Kind("DaemonSet"))
if err == nil {
framework.Logf("Adding deviceplugin addon pod.")
pods.Items = append(pods.Items, devicepluginPods.Items...)
}
framework.Logf("Starting ResourceUsageGather for the created DaemonSet pods.")
rsgather, err := framework.NewResourceUsageGatherer(f.ClientSet, framework.ResourceGathererOptions{false, false, 2 * time.Second, 2 * time.Second, true}, pods)
framework.ExpectNoError(err, "creating ResourceUsageGather for the daemonset pods")
go rsgather.StartGatheringData()
var rsgather *framework.ContainerResourceGatherer
if setupResourceGatherer {
framework.Logf("Starting ResourceUsageGather for the created DaemonSet pods.")
rsgather, err = framework.NewResourceUsageGatherer(f.ClientSet, framework.ResourceGathererOptions{false, false, 2 * time.Second, 2 * time.Second, true}, pods)
framework.ExpectNoError(err, "creating ResourceUsageGather for the daemonset pods")
go rsgather.StartGatheringData()
}
// Wait for Nvidia GPUs to be available on nodes
framework.Logf("Waiting for drivers to be installed and GPUs to be available in Node Capacity...")
@ -206,6 +211,11 @@ func testNvidiaGPUsOnCOS(f *framework.Framework) {
return areGPUsAvailableOnAllSchedulableNodes(f)
}, driverInstallTimeout, time.Second).Should(BeTrue())
return rsgather
}
func testNvidiaGPUsOnCOS(f *framework.Framework) {
rsgather := SetupNVIDIAGPUNode(f, true)
framework.Logf("Creating as many pods as there are Nvidia GPUs and have the pods run a CUDA app")
podList := []*v1.Pod{}
for i := int64(0); i < getGPUsAvailable(f); i++ {