Merge pull request #42942 from vishh/gpu-cont-fix

Automatic merge from submit-queue (batch tested with PRs 42942, 42935)

[Bug] Handle container restarts and avoid using runtime pod cache while allocating GPUs

Fixes #42412

**Background**
Support for multiple GPUs is an experimental feature in v1.6. 
Container restarts were handled incorrectly which resulted in stranding of GPUs
Kubelet is incorrectly using runtime cache to track running pods which can result in race conditions (as it did in other parts of kubelet). This can result in same GPU being assigned to multiple pods.

**What does this PR do**
This PR tracks assignment of GPUs to containers and returns pre-allocated GPUs instead of (incorrectly) allocating new GPUs.
GPU manager is updated to consume a list of active pods derived from apiserver cache instead of runtime cache.
Node e2e has been extended to validate this failure scenario.

**Risk**
Minimal/None since support for GPUs is an experimental feature that is turned off by default. The code is also isolated to GPU manager in kubelet.

**Workarounds**
In the absence of this PR, users can mitigate the original issue by setting `RestartPolicyNever`  in their pods.
There is no workaround for the race condition caused by using the runtime cache though.
Hence it is worth including this fix in v1.6.0.

cc @jianzhangbjz @seelam @kubernetes/sig-node-pr-reviews 

Replaces #42560
This commit is contained in:
Kubernetes Submit Queue 2017-03-14 10:19:17 -07:00 committed by GitHub
commit 6de28fab7d
7 changed files with 128 additions and 61 deletions

View File

@ -185,6 +185,7 @@ pkg/kubelet/container
pkg/kubelet/envvars pkg/kubelet/envvars
pkg/kubelet/eviction pkg/kubelet/eviction
pkg/kubelet/eviction/api pkg/kubelet/eviction/api
pkg/kubelet/gpu/nvidia
pkg/kubelet/util/csr pkg/kubelet/util/csr
pkg/kubelet/util/format pkg/kubelet/util/format
pkg/kubelet/util/ioutils pkg/kubelet/util/ioutils

View File

@ -18,14 +18,16 @@ package nvidia
import "k8s.io/apimachinery/pkg/util/sets" import "k8s.io/apimachinery/pkg/util/sets"
type containerToGPU map[string]sets.String
// podGPUs represents a list of pod to GPU mappings. // podGPUs represents a list of pod to GPU mappings.
type podGPUs struct { type podGPUs struct {
podGPUMapping map[string]sets.String podGPUMapping map[string]containerToGPU
} }
func newPodGPUs() *podGPUs { func newPodGPUs() *podGPUs {
return &podGPUs{ return &podGPUs{
podGPUMapping: map[string]sets.String{}, podGPUMapping: make(map[string]containerToGPU),
} }
} }
func (pgpu *podGPUs) pods() sets.String { func (pgpu *podGPUs) pods() sets.String {
@ -36,12 +38,26 @@ func (pgpu *podGPUs) pods() sets.String {
return ret return ret
} }
func (pgpu *podGPUs) insert(podUID string, device string) { func (pgpu *podGPUs) insert(podUID, contName string, device string) {
if _, exists := pgpu.podGPUMapping[podUID]; !exists { if _, exists := pgpu.podGPUMapping[podUID]; !exists {
pgpu.podGPUMapping[podUID] = sets.NewString(device) pgpu.podGPUMapping[podUID] = make(containerToGPU)
} else {
pgpu.podGPUMapping[podUID].Insert(device)
} }
if _, exists := pgpu.podGPUMapping[podUID][contName]; !exists {
pgpu.podGPUMapping[podUID][contName] = sets.NewString()
}
pgpu.podGPUMapping[podUID][contName].Insert(device)
}
func (pgpu *podGPUs) getGPUs(podUID, contName string) sets.String {
containers, exists := pgpu.podGPUMapping[podUID]
if !exists {
return nil
}
devices, exists := containers[contName]
if !exists {
return nil
}
return devices
} }
func (pgpu *podGPUs) delete(pods []string) { func (pgpu *podGPUs) delete(pods []string) {
@ -52,8 +68,10 @@ func (pgpu *podGPUs) delete(pods []string) {
func (pgpu *podGPUs) devices() sets.String { func (pgpu *podGPUs) devices() sets.String {
ret := sets.NewString() ret := sets.NewString()
for _, devices := range pgpu.podGPUMapping { for _, containerToGPU := range pgpu.podGPUMapping {
ret = ret.Union(devices) for _, deviceSet := range containerToGPU {
ret = ret.Union(deviceSet)
}
} }
return ret return ret
} }

View File

@ -48,7 +48,7 @@ const (
type activePodsLister interface { type activePodsLister interface {
// Returns a list of active pods on the node. // Returns a list of active pods on the node.
GetRunningPods() ([]*v1.Pod, error) GetActivePods() []*v1.Pod
} }
// nvidiaGPUManager manages nvidia gpu devices. // nvidiaGPUManager manages nvidia gpu devices.
@ -102,10 +102,7 @@ func (ngm *nvidiaGPUManager) Start() error {
return err return err
} }
// It's possible that the runtime isn't available now. // It's possible that the runtime isn't available now.
allocatedGPUs, err := ngm.gpusInUse() ngm.allocated = ngm.gpusInUse()
if err == nil {
ngm.allocated = allocatedGPUs
}
// We ignore errors when identifying allocated GPUs because it is possible that the runtime interfaces may be not be logically up. // We ignore errors when identifying allocated GPUs because it is possible that the runtime interfaces may be not be logically up.
return nil return nil
} }
@ -141,16 +138,16 @@ func (ngm *nvidiaGPUManager) AllocateGPU(pod *v1.Pod, container *v1.Container) (
defer ngm.Unlock() defer ngm.Unlock()
if ngm.allocated == nil { if ngm.allocated == nil {
// Initialization is not complete. Try now. Failures can no longer be tolerated. // Initialization is not complete. Try now. Failures can no longer be tolerated.
allocated, err := ngm.gpusInUse() ngm.allocated = ngm.gpusInUse()
if err != nil {
return nil, fmt.Errorf("Failed to allocate GPUs because of issues identifying GPUs in use: %v", err)
}
ngm.allocated = allocated
} else { } else {
// update internal list of GPUs in use prior to allocating new GPUs. // update internal list of GPUs in use prior to allocating new GPUs.
if err := ngm.updateAllocatedGPUs(); err != nil { ngm.updateAllocatedGPUs()
return nil, fmt.Errorf("Failed to allocate GPUs because of issues with updating GPUs in use: %v", err) }
} // Check if GPUs have already been allocated. If so return them right away.
// This can happen if a container restarts for example.
if devices := ngm.allocated.getGPUs(string(pod.UID), container.Name); devices != nil {
glog.V(2).Infof("Found pre-allocated GPUs for container %q in Pod %q: %v", container.Name, pod.UID, devices.List())
return append(devices.List(), ngm.defaultDevices...), nil
} }
// Get GPU devices in use. // Get GPU devices in use.
devicesInUse := ngm.allocated.devices() devicesInUse := ngm.allocated.devices()
@ -164,7 +161,7 @@ func (ngm *nvidiaGPUManager) AllocateGPU(pod *v1.Pod, container *v1.Container) (
ret := available.UnsortedList()[:gpusNeeded] ret := available.UnsortedList()[:gpusNeeded]
for _, device := range ret { for _, device := range ret {
// Update internal allocated GPU cache. // Update internal allocated GPU cache.
ngm.allocated.insert(string(pod.UID), device) ngm.allocated.insert(string(pod.UID), container.Name, device)
} }
// Add standard devices files that needs to be exposed. // Add standard devices files that needs to be exposed.
ret = append(ret, ngm.defaultDevices...) ret = append(ret, ngm.defaultDevices...)
@ -173,13 +170,10 @@ func (ngm *nvidiaGPUManager) AllocateGPU(pod *v1.Pod, container *v1.Container) (
} }
// updateAllocatedGPUs updates the list of GPUs in use. // updateAllocatedGPUs updates the list of GPUs in use.
// It gets a list of running pods and then frees any GPUs that are bound to terminated pods. // It gets a list of active pods and then frees any GPUs that are bound to terminated pods.
// Returns error on failure. // Returns error on failure.
func (ngm *nvidiaGPUManager) updateAllocatedGPUs() error { func (ngm *nvidiaGPUManager) updateAllocatedGPUs() {
activePods, err := ngm.activePodsLister.GetRunningPods() activePods := ngm.activePodsLister.GetActivePods()
if err != nil {
return fmt.Errorf("Failed to list active pods: %v", err)
}
activePodUids := sets.NewString() activePodUids := sets.NewString()
for _, pod := range activePods { for _, pod := range activePods {
activePodUids.Insert(string(pod.UID)) activePodUids.Insert(string(pod.UID))
@ -188,7 +182,6 @@ func (ngm *nvidiaGPUManager) updateAllocatedGPUs() error {
podsToBeRemoved := allocatedPodUids.Difference(activePodUids) podsToBeRemoved := allocatedPodUids.Difference(activePodUids)
glog.V(5).Infof("pods to be removed: %v", podsToBeRemoved.List()) glog.V(5).Infof("pods to be removed: %v", podsToBeRemoved.List())
ngm.allocated.delete(podsToBeRemoved.List()) ngm.allocated.delete(podsToBeRemoved.List())
return nil
} }
// discoverGPUs identifies allGPUs NVIDIA GPU devices available on the local node by walking `/dev` directory. // discoverGPUs identifies allGPUs NVIDIA GPU devices available on the local node by walking `/dev` directory.
@ -217,14 +210,15 @@ func (ngm *nvidiaGPUManager) discoverGPUs() error {
} }
// gpusInUse returns a list of GPUs in use along with the respective pods that are using it. // gpusInUse returns a list of GPUs in use along with the respective pods that are using it.
func (ngm *nvidiaGPUManager) gpusInUse() (*podGPUs, error) { func (ngm *nvidiaGPUManager) gpusInUse() *podGPUs {
pods, err := ngm.activePodsLister.GetRunningPods() pods := ngm.activePodsLister.GetActivePods()
if err != nil { type containerIdentifier struct {
return nil, err id string
name string
} }
type podContainers struct { type podContainers struct {
uid string uid string
containerIDs sets.String containers []containerIdentifier
} }
// List of containers to inspect. // List of containers to inspect.
podContainersToInspect := []podContainers{} podContainersToInspect := []podContainers{}
@ -240,21 +234,23 @@ func (ngm *nvidiaGPUManager) gpusInUse() (*podGPUs, error) {
if containers.Len() == 0 { if containers.Len() == 0 {
continue continue
} }
containerIDs := sets.NewString() // TODO: If kubelet restarts right after allocating a GPU to a pod, the container might not have started yet and so container status might not be available yet.
// Use an internal checkpoint instead or try using the CRI if its checkpoint is reliable.
var containersToInspect []containerIdentifier
for _, container := range pod.Status.ContainerStatuses { for _, container := range pod.Status.ContainerStatuses {
if containers.Has(container.Name) { if containers.Has(container.Name) {
containerIDs.Insert(container.ContainerID) containersToInspect = append(containersToInspect, containerIdentifier{container.ContainerID, container.Name})
} }
} }
// add the pod and its containers that need to be inspected. // add the pod and its containers that need to be inspected.
podContainersToInspect = append(podContainersToInspect, podContainers{string(pod.UID), containerIDs}) podContainersToInspect = append(podContainersToInspect, podContainers{string(pod.UID), containersToInspect})
} }
ret := newPodGPUs() ret := newPodGPUs()
for _, podContainer := range podContainersToInspect { for _, podContainer := range podContainersToInspect {
for _, containerId := range podContainer.containerIDs.List() { for _, containerIdentifier := range podContainer.containers {
containerJSON, err := ngm.dockerClient.InspectContainer(containerId) containerJSON, err := ngm.dockerClient.InspectContainer(containerIdentifier.id)
if err != nil { if err != nil {
glog.V(3).Infof("Failed to inspect container %q in pod %q while attempting to reconcile nvidia gpus in use", containerId, podContainer.uid) glog.V(3).Infof("Failed to inspect container %q in pod %q while attempting to reconcile nvidia gpus in use", containerIdentifier.id, podContainer.uid)
continue continue
} }
@ -266,12 +262,12 @@ func (ngm *nvidiaGPUManager) gpusInUse() (*podGPUs, error) {
for _, device := range devices { for _, device := range devices {
if isValidPath(device.PathOnHost) { if isValidPath(device.PathOnHost) {
glog.V(4).Infof("Nvidia GPU %q is in use by Docker Container: %q", device.PathOnHost, containerJSON.ID) glog.V(4).Infof("Nvidia GPU %q is in use by Docker Container: %q", device.PathOnHost, containerJSON.ID)
ret.insert(podContainer.uid, device.PathOnHost) ret.insert(podContainer.uid, containerIdentifier.name, device.PathOnHost)
} }
} }
} }
} }
return ret, nil return ret
} }
func isValidPath(path string) bool { func isValidPath(path string) bool {

View File

@ -32,12 +32,12 @@ type testActivePodsLister struct {
activePods []*v1.Pod activePods []*v1.Pod
} }
func (tapl *testActivePodsLister) GetRunningPods() ([]*v1.Pod, error) { func (tapl *testActivePodsLister) GetActivePods() []*v1.Pod {
return tapl.activePods, nil return tapl.activePods
} }
func makeTestPod(numContainers int) *v1.Pod { func makeTestPod(numContainers, gpusPerContainer int) *v1.Pod {
quantity := resource.NewQuantity(1, resource.DecimalSI) quantity := resource.NewQuantity(int64(gpusPerContainer), resource.DecimalSI)
resources := v1.ResourceRequirements{ resources := v1.ResourceRequirements{
Limits: v1.ResourceList{ Limits: v1.ResourceList{
v1.ResourceNvidiaGPU: *quantity, v1.ResourceNvidiaGPU: *quantity,
@ -53,6 +53,7 @@ func makeTestPod(numContainers int) *v1.Pod {
} }
for ; numContainers > 0; numContainers-- { for ; numContainers > 0; numContainers-- {
pod.Spec.Containers = append(pod.Spec.Containers, v1.Container{ pod.Spec.Containers = append(pod.Spec.Containers, v1.Container{
Name: string(uuid.NewUUID()),
Resources: resources, Resources: resources,
}) })
} }
@ -69,13 +70,12 @@ func TestMultiContainerPodGPUAllocation(t *testing.T) {
} }
// Expect that no devices are in use. // Expect that no devices are in use.
gpusInUse, err := testGpuManager.gpusInUse() gpusInUse := testGpuManager.gpusInUse()
as := assert.New(t) as := assert.New(t)
as.Nil(err)
as.Equal(len(gpusInUse.devices()), 0) as.Equal(len(gpusInUse.devices()), 0)
// Allocated GPUs for a pod with two containers. // Allocated GPUs for a pod with two containers.
pod := makeTestPod(2) pod := makeTestPod(2, 1)
// Allocate for the first container. // Allocate for the first container.
devices1, err := testGpuManager.AllocateGPU(pod, &pod.Spec.Containers[0]) devices1, err := testGpuManager.AllocateGPU(pod, &pod.Spec.Containers[0])
as.Nil(err) as.Nil(err)
@ -90,7 +90,7 @@ func TestMultiContainerPodGPUAllocation(t *testing.T) {
as.NotEqual(devices1, devices2, "expected containers to get different devices") as.NotEqual(devices1, devices2, "expected containers to get different devices")
// further allocations should fail. // further allocations should fail.
newPod := makeTestPod(2) newPod := makeTestPod(2, 1)
devices1, err = testGpuManager.AllocateGPU(newPod, &newPod.Spec.Containers[0]) devices1, err = testGpuManager.AllocateGPU(newPod, &newPod.Spec.Containers[0])
as.NotNil(err, "expected gpu allocation to fail. got: %v", devices1) as.NotNil(err, "expected gpu allocation to fail. got: %v", devices1)
@ -120,13 +120,12 @@ func TestMultiPodGPUAllocation(t *testing.T) {
} }
// Expect that no devices are in use. // Expect that no devices are in use.
gpusInUse, err := testGpuManager.gpusInUse() gpusInUse := testGpuManager.gpusInUse()
as := assert.New(t) as := assert.New(t)
as.Nil(err)
as.Equal(len(gpusInUse.devices()), 0) as.Equal(len(gpusInUse.devices()), 0)
// Allocated GPUs for a pod with two containers. // Allocated GPUs for a pod with two containers.
podA := makeTestPod(1) podA := makeTestPod(1, 1)
// Allocate for the first container. // Allocate for the first container.
devicesA, err := testGpuManager.AllocateGPU(podA, &podA.Spec.Containers[0]) devicesA, err := testGpuManager.AllocateGPU(podA, &podA.Spec.Containers[0])
as.Nil(err) as.Nil(err)
@ -135,10 +134,47 @@ func TestMultiPodGPUAllocation(t *testing.T) {
podLister.activePods = append(podLister.activePods, podA) podLister.activePods = append(podLister.activePods, podA)
// further allocations should fail. // further allocations should fail.
podB := makeTestPod(1) podB := makeTestPod(1, 1)
// Allocate for the first container. // Allocate for the first container.
devicesB, err := testGpuManager.AllocateGPU(podB, &podB.Spec.Containers[0]) devicesB, err := testGpuManager.AllocateGPU(podB, &podB.Spec.Containers[0])
as.Nil(err) as.Nil(err)
as.Equal(len(devicesB), 1) as.Equal(len(devicesB), 1)
as.NotEqual(devicesA, devicesB, "expected pods to get different devices") as.NotEqual(devicesA, devicesB, "expected pods to get different devices")
} }
func TestPodContainerRestart(t *testing.T) {
podLister := &testActivePodsLister{}
testGpuManager := &nvidiaGPUManager{
activePodsLister: podLister,
allGPUs: sets.NewString("/dev/nvidia0", "/dev/nvidia1"),
allocated: newPodGPUs(),
defaultDevices: []string{"/dev/nvidia-smi"},
}
// Expect that no devices are in use.
gpusInUse := testGpuManager.gpusInUse()
as := assert.New(t)
as.Equal(len(gpusInUse.devices()), 0)
// Make a pod with one containers that requests two GPUs.
podA := makeTestPod(1, 2)
// Allocate GPUs
devicesA, err := testGpuManager.AllocateGPU(podA, &podA.Spec.Containers[0])
as.Nil(err)
as.Equal(len(devicesA), 3)
podLister.activePods = append(podLister.activePods, podA)
// further allocations should fail.
podB := makeTestPod(1, 1)
_, err = testGpuManager.AllocateGPU(podB, &podB.Spec.Containers[0])
as.NotNil(err)
// Allcate GPU for existing Pod A.
// The same gpus must be returned.
devicesAretry, err := testGpuManager.AllocateGPU(podA, &podA.Spec.Containers[0])
as.Nil(err)
as.Equal(len(devicesA), 3)
as.True(sets.NewString(devicesA...).Equal(sets.NewString(devicesAretry...)))
}

View File

@ -792,7 +792,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
klet.AddPodSyncLoopHandler(activeDeadlineHandler) klet.AddPodSyncLoopHandler(activeDeadlineHandler)
klet.AddPodSyncHandler(activeDeadlineHandler) klet.AddPodSyncHandler(activeDeadlineHandler)
criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.getActivePods, killPodNow(klet.podWorkers, kubeDeps.Recorder), kubeDeps.Recorder) criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.GetActivePods, killPodNow(klet.podWorkers, kubeDeps.Recorder), kubeDeps.Recorder)
klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler)) klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler))
// apply functional Option's // apply functional Option's
for _, opt := range kubeDeps.Options { for _, opt := range kubeDeps.Options {
@ -1204,7 +1204,7 @@ func (kl *Kubelet) initializeModules() error {
return fmt.Errorf("Kubelet failed to get node info: %v", err) return fmt.Errorf("Kubelet failed to get node info: %v", err)
} }
if err := kl.containerManager.Start(node, kl.getActivePods); err != nil { if err := kl.containerManager.Start(node, kl.GetActivePods); err != nil {
return fmt.Errorf("Failed to start ContainerManager %v", err) return fmt.Errorf("Failed to start ContainerManager %v", err)
} }
@ -1230,7 +1230,7 @@ func (kl *Kubelet) initializeRuntimeDependentModules() {
glog.Fatalf("Failed to start cAdvisor %v", err) glog.Fatalf("Failed to start cAdvisor %v", err)
} }
// eviction manager must start after cadvisor because it needs to know if the container runtime has a dedicated imagefs // eviction manager must start after cadvisor because it needs to know if the container runtime has a dedicated imagefs
kl.evictionManager.Start(kl, kl.getActivePods, kl, evictionMonitoringPeriod) kl.evictionManager.Start(kl, kl.GetActivePods, kl, evictionMonitoringPeriod)
} }
// Run starts the kubelet reacting to config updates // Run starts the kubelet reacting to config updates

View File

@ -76,8 +76,8 @@ func (kl *Kubelet) listPodsFromDisk() ([]types.UID, error) {
return pods, nil return pods, nil
} }
// getActivePods returns non-terminal pods // GetActivePods returns non-terminal pods
func (kl *Kubelet) getActivePods() []*v1.Pod { func (kl *Kubelet) GetActivePods() []*v1.Pod {
allPods := kl.podManager.GetPods() allPods := kl.podManager.GetPods()
activePods := kl.filterOutTerminatedPods(allPods) activePods := kl.filterOutTerminatedPods(allPods)
return activePods return activePods

View File

@ -18,6 +18,7 @@ package e2e_node
import ( import (
"fmt" "fmt"
"time"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -79,6 +80,20 @@ var _ = framework.KubeDescribe("GPU [Serial]", func() {
podSuccess := makePod(gpusAvailable.Value(), "gpus-success") podSuccess := makePod(gpusAvailable.Value(), "gpus-success")
podSuccess = f.PodClient().CreateSync(podSuccess) podSuccess = f.PodClient().CreateSync(podSuccess)
By("Checking the containers in the pod had restarted at-least twice successfully thereby ensuring GPUs are reused")
const minContainerRestartCount = 2
Eventually(func() bool {
p, err := f.ClientSet.Core().Pods(f.Namespace.Name).Get(podSuccess.Name, metav1.GetOptions{})
if err != nil {
framework.Logf("failed to get pod status: %v", err)
return false
}
if p.Status.ContainerStatuses[0].RestartCount < minContainerRestartCount {
return false
}
return true
}, time.Minute, time.Second).Should(BeTrue())
By("Checking if the pod outputted Success to its logs") By("Checking if the pod outputted Success to its logs")
framework.ExpectNoError(f.PodClient().MatchContainerOutput(podSuccess.Name, podSuccess.Name, "Success")) framework.ExpectNoError(f.PodClient().MatchContainerOutput(podSuccess.Name, podSuccess.Name, "Success"))
@ -115,12 +130,13 @@ func makePod(gpus int64, name string) *v1.Pod {
v1.ResourceNvidiaGPU: *resource.NewQuantity(gpus, resource.DecimalSI), v1.ResourceNvidiaGPU: *resource.NewQuantity(gpus, resource.DecimalSI),
}, },
} }
gpuverificationCmd := fmt.Sprintf("if [[ %d -ne $(ls /dev/ | egrep '^nvidia[0-9]+$') ]]; then exit 1; fi; echo Success; sleep 10240 ", gpus) gpuverificationCmd := fmt.Sprintf("if [[ %d -ne $(ls /dev/ | egrep '^nvidia[0-9]+$') ]]; then exit 1; fi; echo Success", gpus)
return &v1.Pod{ return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: name, Name: name,
}, },
Spec: v1.PodSpec{ Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyAlways,
Containers: []v1.Container{ Containers: []v1.Container{
{ {
Image: "gcr.io/google_containers/busybox:1.24", Image: "gcr.io/google_containers/busybox:1.24",