diff --git a/test/e2e/framework/framework.go b/test/e2e/framework/framework.go index f5341d79c00..821cc69087e 100644 --- a/test/e2e/framework/framework.go +++ b/test/e2e/framework/framework.go @@ -80,7 +80,7 @@ type Framework struct { NamespaceDeletionTimeout time.Duration SkipPrivilegedPSPBinding bool // Whether to skip creating a binding to the privileged PSP in the test namespace - gatherer *containerResourceGatherer + gatherer *ContainerResourceGatherer // Constraints that passed to a check which is executed after data is gathered to // see if 99% of results are within acceptable bounds. It has to be injected in the test, // as expectations vary greatly. Constraints are grouped by the container names. diff --git a/test/e2e/framework/resource_usage_gatherer.go b/test/e2e/framework/resource_usage_gatherer.go index 998968bf859..36797add0bc 100644 --- a/test/e2e/framework/resource_usage_gatherer.go +++ b/test/e2e/framework/resource_usage_gatherer.go @@ -191,7 +191,7 @@ func (w *resourceGatherWorker) gather(initialSleep time.Duration) { } } -type containerResourceGatherer struct { +type ContainerResourceGatherer struct { client clientset.Interface stopCh chan struct{} workers []resourceGatherWorker @@ -208,8 +208,8 @@ type ResourceGathererOptions struct { PrintVerboseLogs bool } -func NewResourceUsageGatherer(c clientset.Interface, options ResourceGathererOptions, pods *v1.PodList) (*containerResourceGatherer, error) { - g := containerResourceGatherer{ +func NewResourceUsageGatherer(c clientset.Interface, options ResourceGathererOptions, pods *v1.PodList) (*ContainerResourceGatherer, error) { + g := ContainerResourceGatherer{ client: c, stopCh: make(chan struct{}), containerIDs: make([]string, 0), @@ -277,7 +277,7 @@ func NewResourceUsageGatherer(c clientset.Interface, options ResourceGathererOpt // StartGatheringData starts a stat gathering worker blocks for each node to track, // and blocks until StopAndSummarize is called. -func (g *containerResourceGatherer) StartGatheringData() { +func (g *ContainerResourceGatherer) StartGatheringData() { if len(g.workers) == 0 { return } @@ -294,7 +294,7 @@ func (g *containerResourceGatherer) StartGatheringData() { // generates resource summary for the passed-in percentiles, and returns the summary. // It returns an error if the resource usage at any percentile is beyond the // specified resource constraints. -func (g *containerResourceGatherer) StopAndSummarize(percentiles []int, constraints map[string]ResourceConstraint) (*ResourceUsageSummary, error) { +func (g *ContainerResourceGatherer) StopAndSummarize(percentiles []int, constraints map[string]ResourceConstraint) (*ResourceUsageSummary, error) { close(g.stopCh) Logf("Closed stop channel. Waiting for %v workers", len(g.workers)) finished := make(chan struct{}) diff --git a/test/e2e/instrumentation/monitoring/BUILD b/test/e2e/instrumentation/monitoring/BUILD index 4a0523411c5..535098bcfb2 100644 --- a/test/e2e/instrumentation/monitoring/BUILD +++ b/test/e2e/instrumentation/monitoring/BUILD @@ -8,6 +8,7 @@ load( go_library( name = "go_default_library", srcs = [ + "accelerator.go", "cadvisor.go", "custom_metrics_deployments.go", "custom_metrics_stackdriver.go", @@ -21,6 +22,8 @@ go_library( "//test/e2e/framework:go_default_library", "//test/e2e/framework/metrics:go_default_library", "//test/e2e/instrumentation/common:go_default_library", + "//test/e2e/scheduling:go_default_library", + "//test/utils/image:go_default_library", "//vendor/github.com/influxdata/influxdb/client/v2:go_default_library", "//vendor/github.com/onsi/ginkgo:go_default_library", "//vendor/github.com/onsi/gomega:go_default_library", @@ -29,6 +32,7 @@ go_library( "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/extensions/v1beta1:go_default_library", "//vendor/k8s.io/api/rbac/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", diff --git a/test/e2e/instrumentation/monitoring/accelerator.go b/test/e2e/instrumentation/monitoring/accelerator.go new file mode 100644 index 00000000000..8205365069f --- /dev/null +++ b/test/e2e/instrumentation/monitoring/accelerator.go @@ -0,0 +1,134 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package monitoring + +import ( + "context" + "os" + "time" + + . "github.com/onsi/ginkgo" + "golang.org/x/oauth2/google" + gcm "google.golang.org/api/monitoring/v3" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/kubernetes/test/e2e/framework" + instrumentation "k8s.io/kubernetes/test/e2e/instrumentation/common" + "k8s.io/kubernetes/test/e2e/scheduling" + "k8s.io/kubernetes/test/utils/image" +) + +// Stackdriver container accelerator metrics, as described here: +// https://cloud.google.com/monitoring/api/metrics_gcp#gcp-container +var acceleratorMetrics = []string{ + "accelerator/duty_cycle", + "accelerator/memory_total", + "accelerator/memory_used", +} + +var _ = instrumentation.SIGDescribe("Stackdriver Monitoring", func() { + BeforeEach(func() { + framework.SkipUnlessProviderIs("gce", "gke") + }) + + f := framework.NewDefaultFramework("stackdriver-monitoring") + + It("should have accelerator metrics [Feature:StackdriverAcceleratorMonitoring]", func() { + testStackdriverAcceleratorMonitoring(f) + }) + +}) + +func testStackdriverAcceleratorMonitoring(f *framework.Framework) { + projectId := framework.TestContext.CloudConfig.ProjectID + + ctx := context.Background() + client, err := google.DefaultClient(ctx, gcm.CloudPlatformScope) + + gcmService, err := gcm.New(client) + + framework.ExpectNoError(err) + + // set this env var if accessing Stackdriver test endpoint (default is prod): + // $ export STACKDRIVER_API_ENDPOINT_OVERRIDE=https://test-monitoring.sandbox.googleapis.com/ + basePathOverride := os.Getenv("STACKDRIVER_API_ENDPOINT_OVERRIDE") + if basePathOverride != "" { + gcmService.BasePath = basePathOverride + } + + scheduling.SetupNVIDIAGPUNode(f, false) + + // TODO: remove this after cAdvisor race is fixed. + time.Sleep(time.Minute) + + f.PodClient().Create(&v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: rcName, + }, + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyNever, + Containers: []v1.Container{ + { + Name: rcName, + Image: image.GetE2EImage(image.CudaVectorAdd), + Command: []string{"/bin/sh", "-c"}, + Args: []string{"nvidia-smi && sleep infinity"}, + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + framework.NVIDIAGPUResourceName: *resource.NewQuantity(1, resource.DecimalSI), + }, + }, + }, + }, + }, + }) + + metricsMap := map[string]bool{} + pollingFunction := checkForAcceleratorMetrics(projectId, gcmService, time.Now(), metricsMap) + err = wait.Poll(pollFrequency, pollTimeout, pollingFunction) + if err != nil { + framework.Logf("Missing metrics: %+v\n", metricsMap) + } + framework.ExpectNoError(err) +} + +func checkForAcceleratorMetrics(projectId string, gcmService *gcm.Service, start time.Time, metricsMap map[string]bool) func() (bool, error) { + return func() (bool, error) { + counter := 0 + for _, metric := range acceleratorMetrics { + metricsMap[metric] = false + } + for _, metric := range acceleratorMetrics { + // TODO: check only for metrics from this cluster + ts, err := fetchTimeSeries(projectId, gcmService, metric, start, time.Now()) + framework.ExpectNoError(err) + if len(ts) > 0 { + counter = counter + 1 + metricsMap[metric] = true + framework.Logf("Received %v timeseries for metric %v", len(ts), metric) + } else { + framework.Logf("No timeseries for metric %v", metric) + } + } + if counter < 3 { + return false, nil + } + return true, nil + } +} diff --git a/test/e2e/scheduling/nvidia-gpus.go b/test/e2e/scheduling/nvidia-gpus.go index f3b4601073c..eae99c860f5 100644 --- a/test/e2e/scheduling/nvidia-gpus.go +++ b/test/e2e/scheduling/nvidia-gpus.go @@ -153,7 +153,8 @@ func getGPUsAvailable(f *framework.Framework) int64 { return gpusAvailable } -func testNvidiaGPUsOnCOS(f *framework.Framework) { +func SetupNVIDIAGPUNode(f *framework.Framework, setupResourceGatherer bool) *framework.ContainerResourceGatherer { + // Skip the test if the base image is not COS. // TODO: Add support for other base images. // CUDA apps require host mounts which is not portable across base images (yet). @@ -163,7 +164,11 @@ func testNvidiaGPUsOnCOS(f *framework.Framework) { } framework.Logf("Cluster is running on COS. Proceeding with test") - if f.BaseName == "device-plugin-gpus" { + if f.BaseName == "gpus" { + dsYamlUrl = "https://raw.githubusercontent.com/ContainerEngine/accelerators/master/cos-nvidia-gpu-installer/daemonset.yaml" + gpuResourceName = v1.ResourceNvidiaGPU + podCreationFunc = makeCudaAdditionTestPod + } else { dsYamlUrlFromEnv := os.Getenv("NVIDIA_DRIVER_INSTALLER_DAEMONSET") if dsYamlUrlFromEnv != "" { dsYamlUrl = dsYamlUrlFromEnv @@ -172,33 +177,33 @@ func testNvidiaGPUsOnCOS(f *framework.Framework) { } gpuResourceName = framework.NVIDIAGPUResourceName podCreationFunc = makeCudaAdditionDevicePluginTestPod - } else { - dsYamlUrl = "https://raw.githubusercontent.com/ContainerEngine/accelerators/master/cos-nvidia-gpu-installer/daemonset.yaml" - gpuResourceName = v1.ResourceNvidiaGPU - podCreationFunc = makeCudaAdditionTestPod } framework.Logf("Using %v", dsYamlUrl) // Creates the DaemonSet that installs Nvidia Drivers. - // The DaemonSet also runs nvidia device plugin for device plugin test. ds, err := framework.DsFromManifest(dsYamlUrl) Expect(err).NotTo(HaveOccurred()) ds.Namespace = f.Namespace.Name _, err = f.ClientSet.ExtensionsV1beta1().DaemonSets(f.Namespace.Name).Create(ds) - framework.ExpectNoError(err, "failed to create daemonset") + framework.ExpectNoError(err, "failed to create nvidia-driver-installer daemonset") framework.Logf("Successfully created daemonset to install Nvidia drivers.") pods, err := framework.WaitForControlledPods(f.ClientSet, ds.Namespace, ds.Name, extensionsinternal.Kind("DaemonSet")) - framework.ExpectNoError(err, "getting pods controlled by the daemonset") + framework.ExpectNoError(err, "failed to get pods controlled by the nvidia-driver-installer daemonset") + devicepluginPods, err := framework.WaitForControlledPods(f.ClientSet, "kube-system", "nvidia-gpu-device-plugin", extensionsinternal.Kind("DaemonSet")) if err == nil { framework.Logf("Adding deviceplugin addon pod.") pods.Items = append(pods.Items, devicepluginPods.Items...) } - framework.Logf("Starting ResourceUsageGather for the created DaemonSet pods.") - rsgather, err := framework.NewResourceUsageGatherer(f.ClientSet, framework.ResourceGathererOptions{false, false, 2 * time.Second, 2 * time.Second, true}, pods) - framework.ExpectNoError(err, "creating ResourceUsageGather for the daemonset pods") - go rsgather.StartGatheringData() + + var rsgather *framework.ContainerResourceGatherer + if setupResourceGatherer { + framework.Logf("Starting ResourceUsageGather for the created DaemonSet pods.") + rsgather, err = framework.NewResourceUsageGatherer(f.ClientSet, framework.ResourceGathererOptions{false, false, 2 * time.Second, 2 * time.Second, true}, pods) + framework.ExpectNoError(err, "creating ResourceUsageGather for the daemonset pods") + go rsgather.StartGatheringData() + } // Wait for Nvidia GPUs to be available on nodes framework.Logf("Waiting for drivers to be installed and GPUs to be available in Node Capacity...") @@ -206,6 +211,11 @@ func testNvidiaGPUsOnCOS(f *framework.Framework) { return areGPUsAvailableOnAllSchedulableNodes(f) }, driverInstallTimeout, time.Second).Should(BeTrue()) + return rsgather +} + +func testNvidiaGPUsOnCOS(f *framework.Framework) { + rsgather := SetupNVIDIAGPUNode(f, true) framework.Logf("Creating as many pods as there are Nvidia GPUs and have the pods run a CUDA app") podList := []*v1.Pod{} for i := int64(0); i < getGPUsAvailable(f); i++ {