diff --git a/hack/.linted_packages b/hack/.linted_packages index 66d3afd446a..4ff4b3d3a49 100644 --- a/hack/.linted_packages +++ b/hack/.linted_packages @@ -185,6 +185,7 @@ pkg/kubelet/container pkg/kubelet/envvars pkg/kubelet/eviction pkg/kubelet/eviction/api +pkg/kubelet/gpu/nvidia pkg/kubelet/util/csr pkg/kubelet/util/format pkg/kubelet/util/ioutils diff --git a/pkg/kubelet/gpu/nvidia/helpers.go b/pkg/kubelet/gpu/nvidia/helpers.go index 0abedbf4014..a6ed7c4f935 100644 --- a/pkg/kubelet/gpu/nvidia/helpers.go +++ b/pkg/kubelet/gpu/nvidia/helpers.go @@ -18,14 +18,16 @@ package nvidia import "k8s.io/apimachinery/pkg/util/sets" +type containerToGPU map[string]sets.String + // podGPUs represents a list of pod to GPU mappings. type podGPUs struct { - podGPUMapping map[string]sets.String + podGPUMapping map[string]containerToGPU } func newPodGPUs() *podGPUs { return &podGPUs{ - podGPUMapping: map[string]sets.String{}, + podGPUMapping: make(map[string]containerToGPU), } } func (pgpu *podGPUs) pods() sets.String { @@ -36,12 +38,26 @@ func (pgpu *podGPUs) pods() sets.String { return ret } -func (pgpu *podGPUs) insert(podUID string, device string) { +func (pgpu *podGPUs) insert(podUID, contName string, device string) { if _, exists := pgpu.podGPUMapping[podUID]; !exists { - pgpu.podGPUMapping[podUID] = sets.NewString(device) - } else { - pgpu.podGPUMapping[podUID].Insert(device) + pgpu.podGPUMapping[podUID] = make(containerToGPU) } + if _, exists := pgpu.podGPUMapping[podUID][contName]; !exists { + pgpu.podGPUMapping[podUID][contName] = sets.NewString() + } + pgpu.podGPUMapping[podUID][contName].Insert(device) +} + +func (pgpu *podGPUs) getGPUs(podUID, contName string) sets.String { + containers, exists := pgpu.podGPUMapping[podUID] + if !exists { + return nil + } + devices, exists := containers[contName] + if !exists { + return nil + } + return devices } func (pgpu *podGPUs) delete(pods []string) { @@ -52,8 +68,10 @@ func (pgpu *podGPUs) delete(pods []string) { func (pgpu *podGPUs) devices() sets.String { ret := sets.NewString() - for _, devices := range pgpu.podGPUMapping { - ret = ret.Union(devices) + for _, containerToGPU := range pgpu.podGPUMapping { + for _, deviceSet := range containerToGPU { + ret = ret.Union(deviceSet) + } } return ret } diff --git a/pkg/kubelet/gpu/nvidia/nvidia_gpu_manager.go b/pkg/kubelet/gpu/nvidia/nvidia_gpu_manager.go index 43b0e3b32a0..d83c83d386e 100644 --- a/pkg/kubelet/gpu/nvidia/nvidia_gpu_manager.go +++ b/pkg/kubelet/gpu/nvidia/nvidia_gpu_manager.go @@ -48,7 +48,7 @@ const ( type activePodsLister interface { // Returns a list of active pods on the node. - GetRunningPods() ([]*v1.Pod, error) + GetActivePods() []*v1.Pod } // nvidiaGPUManager manages nvidia gpu devices. @@ -102,10 +102,7 @@ func (ngm *nvidiaGPUManager) Start() error { return err } // It's possible that the runtime isn't available now. - allocatedGPUs, err := ngm.gpusInUse() - if err == nil { - ngm.allocated = allocatedGPUs - } + ngm.allocated = ngm.gpusInUse() // We ignore errors when identifying allocated GPUs because it is possible that the runtime interfaces may be not be logically up. return nil } @@ -141,16 +138,16 @@ func (ngm *nvidiaGPUManager) AllocateGPU(pod *v1.Pod, container *v1.Container) ( defer ngm.Unlock() if ngm.allocated == nil { // Initialization is not complete. Try now. Failures can no longer be tolerated. - allocated, err := ngm.gpusInUse() - if err != nil { - return nil, fmt.Errorf("Failed to allocate GPUs because of issues identifying GPUs in use: %v", err) - } - ngm.allocated = allocated + ngm.allocated = ngm.gpusInUse() } else { // update internal list of GPUs in use prior to allocating new GPUs. - if err := ngm.updateAllocatedGPUs(); err != nil { - return nil, fmt.Errorf("Failed to allocate GPUs because of issues with updating GPUs in use: %v", err) - } + ngm.updateAllocatedGPUs() + } + // Check if GPUs have already been allocated. If so return them right away. + // This can happen if a container restarts for example. + if devices := ngm.allocated.getGPUs(string(pod.UID), container.Name); devices != nil { + glog.V(2).Infof("Found pre-allocated GPUs for container %q in Pod %q: %v", container.Name, pod.UID, devices.List()) + return append(devices.List(), ngm.defaultDevices...), nil } // Get GPU devices in use. devicesInUse := ngm.allocated.devices() @@ -164,7 +161,7 @@ func (ngm *nvidiaGPUManager) AllocateGPU(pod *v1.Pod, container *v1.Container) ( ret := available.UnsortedList()[:gpusNeeded] for _, device := range ret { // Update internal allocated GPU cache. - ngm.allocated.insert(string(pod.UID), device) + ngm.allocated.insert(string(pod.UID), container.Name, device) } // Add standard devices files that needs to be exposed. ret = append(ret, ngm.defaultDevices...) @@ -173,13 +170,10 @@ func (ngm *nvidiaGPUManager) AllocateGPU(pod *v1.Pod, container *v1.Container) ( } // updateAllocatedGPUs updates the list of GPUs in use. -// It gets a list of running pods and then frees any GPUs that are bound to terminated pods. +// It gets a list of active pods and then frees any GPUs that are bound to terminated pods. // Returns error on failure. -func (ngm *nvidiaGPUManager) updateAllocatedGPUs() error { - activePods, err := ngm.activePodsLister.GetRunningPods() - if err != nil { - return fmt.Errorf("Failed to list active pods: %v", err) - } +func (ngm *nvidiaGPUManager) updateAllocatedGPUs() { + activePods := ngm.activePodsLister.GetActivePods() activePodUids := sets.NewString() for _, pod := range activePods { activePodUids.Insert(string(pod.UID)) @@ -188,7 +182,6 @@ func (ngm *nvidiaGPUManager) updateAllocatedGPUs() error { podsToBeRemoved := allocatedPodUids.Difference(activePodUids) glog.V(5).Infof("pods to be removed: %v", podsToBeRemoved.List()) ngm.allocated.delete(podsToBeRemoved.List()) - return nil } // discoverGPUs identifies allGPUs NVIDIA GPU devices available on the local node by walking `/dev` directory. @@ -217,14 +210,15 @@ func (ngm *nvidiaGPUManager) discoverGPUs() error { } // gpusInUse returns a list of GPUs in use along with the respective pods that are using it. -func (ngm *nvidiaGPUManager) gpusInUse() (*podGPUs, error) { - pods, err := ngm.activePodsLister.GetRunningPods() - if err != nil { - return nil, err +func (ngm *nvidiaGPUManager) gpusInUse() *podGPUs { + pods := ngm.activePodsLister.GetActivePods() + type containerIdentifier struct { + id string + name string } type podContainers struct { - uid string - containerIDs sets.String + uid string + containers []containerIdentifier } // List of containers to inspect. podContainersToInspect := []podContainers{} @@ -240,21 +234,23 @@ func (ngm *nvidiaGPUManager) gpusInUse() (*podGPUs, error) { if containers.Len() == 0 { continue } - containerIDs := sets.NewString() + // TODO: If kubelet restarts right after allocating a GPU to a pod, the container might not have started yet and so container status might not be available yet. + // Use an internal checkpoint instead or try using the CRI if its checkpoint is reliable. + var containersToInspect []containerIdentifier for _, container := range pod.Status.ContainerStatuses { if containers.Has(container.Name) { - containerIDs.Insert(container.ContainerID) + containersToInspect = append(containersToInspect, containerIdentifier{container.ContainerID, container.Name}) } } // add the pod and its containers that need to be inspected. - podContainersToInspect = append(podContainersToInspect, podContainers{string(pod.UID), containerIDs}) + podContainersToInspect = append(podContainersToInspect, podContainers{string(pod.UID), containersToInspect}) } ret := newPodGPUs() for _, podContainer := range podContainersToInspect { - for _, containerId := range podContainer.containerIDs.List() { - containerJSON, err := ngm.dockerClient.InspectContainer(containerId) + for _, containerIdentifier := range podContainer.containers { + containerJSON, err := ngm.dockerClient.InspectContainer(containerIdentifier.id) if err != nil { - glog.V(3).Infof("Failed to inspect container %q in pod %q while attempting to reconcile nvidia gpus in use", containerId, podContainer.uid) + glog.V(3).Infof("Failed to inspect container %q in pod %q while attempting to reconcile nvidia gpus in use", containerIdentifier.id, podContainer.uid) continue } @@ -266,12 +262,12 @@ func (ngm *nvidiaGPUManager) gpusInUse() (*podGPUs, error) { for _, device := range devices { if isValidPath(device.PathOnHost) { glog.V(4).Infof("Nvidia GPU %q is in use by Docker Container: %q", device.PathOnHost, containerJSON.ID) - ret.insert(podContainer.uid, device.PathOnHost) + ret.insert(podContainer.uid, containerIdentifier.name, device.PathOnHost) } } } } - return ret, nil + return ret } func isValidPath(path string) bool { diff --git a/pkg/kubelet/gpu/nvidia/nvidia_gpu_manager_test.go b/pkg/kubelet/gpu/nvidia/nvidia_gpu_manager_test.go index aea168ba568..6c893839947 100644 --- a/pkg/kubelet/gpu/nvidia/nvidia_gpu_manager_test.go +++ b/pkg/kubelet/gpu/nvidia/nvidia_gpu_manager_test.go @@ -32,12 +32,12 @@ type testActivePodsLister struct { activePods []*v1.Pod } -func (tapl *testActivePodsLister) GetRunningPods() ([]*v1.Pod, error) { - return tapl.activePods, nil +func (tapl *testActivePodsLister) GetActivePods() []*v1.Pod { + return tapl.activePods } -func makeTestPod(numContainers int) *v1.Pod { - quantity := resource.NewQuantity(1, resource.DecimalSI) +func makeTestPod(numContainers, gpusPerContainer int) *v1.Pod { + quantity := resource.NewQuantity(int64(gpusPerContainer), resource.DecimalSI) resources := v1.ResourceRequirements{ Limits: v1.ResourceList{ v1.ResourceNvidiaGPU: *quantity, @@ -53,6 +53,7 @@ func makeTestPod(numContainers int) *v1.Pod { } for ; numContainers > 0; numContainers-- { pod.Spec.Containers = append(pod.Spec.Containers, v1.Container{ + Name: string(uuid.NewUUID()), Resources: resources, }) } @@ -69,13 +70,12 @@ func TestMultiContainerPodGPUAllocation(t *testing.T) { } // Expect that no devices are in use. - gpusInUse, err := testGpuManager.gpusInUse() + gpusInUse := testGpuManager.gpusInUse() as := assert.New(t) - as.Nil(err) as.Equal(len(gpusInUse.devices()), 0) // Allocated GPUs for a pod with two containers. - pod := makeTestPod(2) + pod := makeTestPod(2, 1) // Allocate for the first container. devices1, err := testGpuManager.AllocateGPU(pod, &pod.Spec.Containers[0]) as.Nil(err) @@ -90,7 +90,7 @@ func TestMultiContainerPodGPUAllocation(t *testing.T) { as.NotEqual(devices1, devices2, "expected containers to get different devices") // further allocations should fail. - newPod := makeTestPod(2) + newPod := makeTestPod(2, 1) devices1, err = testGpuManager.AllocateGPU(newPod, &newPod.Spec.Containers[0]) as.NotNil(err, "expected gpu allocation to fail. got: %v", devices1) @@ -120,13 +120,12 @@ func TestMultiPodGPUAllocation(t *testing.T) { } // Expect that no devices are in use. - gpusInUse, err := testGpuManager.gpusInUse() + gpusInUse := testGpuManager.gpusInUse() as := assert.New(t) - as.Nil(err) as.Equal(len(gpusInUse.devices()), 0) // Allocated GPUs for a pod with two containers. - podA := makeTestPod(1) + podA := makeTestPod(1, 1) // Allocate for the first container. devicesA, err := testGpuManager.AllocateGPU(podA, &podA.Spec.Containers[0]) as.Nil(err) @@ -135,10 +134,47 @@ func TestMultiPodGPUAllocation(t *testing.T) { podLister.activePods = append(podLister.activePods, podA) // further allocations should fail. - podB := makeTestPod(1) + podB := makeTestPod(1, 1) // Allocate for the first container. devicesB, err := testGpuManager.AllocateGPU(podB, &podB.Spec.Containers[0]) as.Nil(err) as.Equal(len(devicesB), 1) as.NotEqual(devicesA, devicesB, "expected pods to get different devices") } + +func TestPodContainerRestart(t *testing.T) { + podLister := &testActivePodsLister{} + + testGpuManager := &nvidiaGPUManager{ + activePodsLister: podLister, + allGPUs: sets.NewString("/dev/nvidia0", "/dev/nvidia1"), + allocated: newPodGPUs(), + defaultDevices: []string{"/dev/nvidia-smi"}, + } + + // Expect that no devices are in use. + gpusInUse := testGpuManager.gpusInUse() + as := assert.New(t) + as.Equal(len(gpusInUse.devices()), 0) + + // Make a pod with one containers that requests two GPUs. + podA := makeTestPod(1, 2) + // Allocate GPUs + devicesA, err := testGpuManager.AllocateGPU(podA, &podA.Spec.Containers[0]) + as.Nil(err) + as.Equal(len(devicesA), 3) + + podLister.activePods = append(podLister.activePods, podA) + + // further allocations should fail. + podB := makeTestPod(1, 1) + _, err = testGpuManager.AllocateGPU(podB, &podB.Spec.Containers[0]) + as.NotNil(err) + + // Allcate GPU for existing Pod A. + // The same gpus must be returned. + devicesAretry, err := testGpuManager.AllocateGPU(podA, &podA.Spec.Containers[0]) + as.Nil(err) + as.Equal(len(devicesA), 3) + as.True(sets.NewString(devicesA...).Equal(sets.NewString(devicesAretry...))) +} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 14bcce91f8b..9584ef5f1b0 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -792,7 +792,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub klet.AddPodSyncLoopHandler(activeDeadlineHandler) klet.AddPodSyncHandler(activeDeadlineHandler) - criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.getActivePods, killPodNow(klet.podWorkers, kubeDeps.Recorder), kubeDeps.Recorder) + criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.GetActivePods, killPodNow(klet.podWorkers, kubeDeps.Recorder), kubeDeps.Recorder) klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler)) // apply functional Option's for _, opt := range kubeDeps.Options { @@ -1204,7 +1204,7 @@ func (kl *Kubelet) initializeModules() error { return fmt.Errorf("Kubelet failed to get node info: %v", err) } - if err := kl.containerManager.Start(node, kl.getActivePods); err != nil { + if err := kl.containerManager.Start(node, kl.GetActivePods); err != nil { return fmt.Errorf("Failed to start ContainerManager %v", err) } @@ -1230,7 +1230,7 @@ func (kl *Kubelet) initializeRuntimeDependentModules() { glog.Fatalf("Failed to start cAdvisor %v", err) } // eviction manager must start after cadvisor because it needs to know if the container runtime has a dedicated imagefs - kl.evictionManager.Start(kl, kl.getActivePods, kl, evictionMonitoringPeriod) + kl.evictionManager.Start(kl, kl.GetActivePods, kl, evictionMonitoringPeriod) } // Run starts the kubelet reacting to config updates diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index d96627f9f7f..e1eda16586a 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -76,8 +76,8 @@ func (kl *Kubelet) listPodsFromDisk() ([]types.UID, error) { return pods, nil } -// getActivePods returns non-terminal pods -func (kl *Kubelet) getActivePods() []*v1.Pod { +// GetActivePods returns non-terminal pods +func (kl *Kubelet) GetActivePods() []*v1.Pod { allPods := kl.podManager.GetPods() activePods := kl.filterOutTerminatedPods(allPods) return activePods diff --git a/test/e2e_node/gpus.go b/test/e2e_node/gpus.go index 73763237bb9..1e3eb73748c 100644 --- a/test/e2e_node/gpus.go +++ b/test/e2e_node/gpus.go @@ -18,6 +18,7 @@ package e2e_node import ( "fmt" + "time" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -79,6 +80,20 @@ var _ = framework.KubeDescribe("GPU [Serial]", func() { podSuccess := makePod(gpusAvailable.Value(), "gpus-success") podSuccess = f.PodClient().CreateSync(podSuccess) + By("Checking the containers in the pod had restarted at-least twice successfully thereby ensuring GPUs are reused") + const minContainerRestartCount = 2 + Eventually(func() bool { + p, err := f.ClientSet.Core().Pods(f.Namespace.Name).Get(podSuccess.Name, metav1.GetOptions{}) + if err != nil { + framework.Logf("failed to get pod status: %v", err) + return false + } + if p.Status.ContainerStatuses[0].RestartCount < minContainerRestartCount { + return false + } + return true + }, time.Minute, time.Second).Should(BeTrue()) + By("Checking if the pod outputted Success to its logs") framework.ExpectNoError(f.PodClient().MatchContainerOutput(podSuccess.Name, podSuccess.Name, "Success")) @@ -115,12 +130,13 @@ func makePod(gpus int64, name string) *v1.Pod { v1.ResourceNvidiaGPU: *resource.NewQuantity(gpus, resource.DecimalSI), }, } - gpuverificationCmd := fmt.Sprintf("if [[ %d -ne $(ls /dev/ | egrep '^nvidia[0-9]+$') ]]; then exit 1; fi; echo Success; sleep 10240 ", gpus) + gpuverificationCmd := fmt.Sprintf("if [[ %d -ne $(ls /dev/ | egrep '^nvidia[0-9]+$') ]]; then exit 1; fi; echo Success", gpus) return &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: name, }, Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyAlways, Containers: []v1.Container{ { Image: "gcr.io/google_containers/busybox:1.24",