mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 15:05:27 +00:00
Merge pull request #55088 from jiayingz/capacity
Automatic merge from submit-queue (batch tested with PRs 56021, 55843, 55088, 56117, 55859). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Extends deviceplugin to gracefully handle full device plugin lifecycle. **What this PR does / why we need it**: - Instead of using cm.capacity field to communicate device plugin resource capacity, this PR changes to use an explicit cm.GetDevicePluginResourceCapacity() function that returns device plugin resource capacity as well as any inactive device plugin resource. Kubelet syncNodeStatus call this function during its periodic run to update node status capacity and allocatable. After this call, device plugin can remove the inactive device plugin resource from its allDevices field as the update is already pushed to API server. - Extends device plugin checkpoint data to record registered resources so that we can finish resource removing even upon kubelet restarts. - Passes sourcesReady from kubelet to device plugin to avoid removing inactive pods during grace period of kubelet restart. - Extends gpu_device_plugin e2e_node test to verify that scheduled pods can continue to run even after device plugin deletion and kubelet restarts. **Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*: Together with https://github.com/kubernetes/kubernetes/pull/54488, fixes https://github.com/kubernetes/kubernetes/issues/53395 **Special notes for your reviewer**: **Release note**: ```release-note Extends deviceplugin to gracefully handle full device plugin lifecycle. ```
This commit is contained in:
commit
5242f01e8c
@ -34,6 +34,7 @@ go_library(
|
||||
"//pkg/kubelet/apis/cri:go_default_library",
|
||||
"//pkg/kubelet/cadvisor:go_default_library",
|
||||
"//pkg/kubelet/cm/cpumanager:go_default_library",
|
||||
"//pkg/kubelet/config:go_default_library",
|
||||
"//pkg/kubelet/container:go_default_library",
|
||||
"//pkg/kubelet/eviction/api:go_default_library",
|
||||
"//pkg/kubelet/lifecycle:go_default_library",
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
// TODO: Migrate kubelet to either use its own internal objects or client library.
|
||||
"k8s.io/api/core/v1"
|
||||
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
|
||||
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
|
||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||
@ -41,7 +42,7 @@ type ContainerManager interface {
|
||||
// Runs the container manager's housekeeping.
|
||||
// - Ensures that the Docker daemon is in a container.
|
||||
// - Creates the system container where all non-containerized processes run.
|
||||
Start(*v1.Node, ActivePodsFunc, status.PodStatusProvider, internalapi.RuntimeService) error
|
||||
Start(*v1.Node, ActivePodsFunc, config.SourcesReady, status.PodStatusProvider, internalapi.RuntimeService) error
|
||||
|
||||
// SystemCgroupsLimit returns resources allocated to system cgroups in the machine.
|
||||
// These cgroups include the system and Kubernetes services.
|
||||
@ -69,6 +70,10 @@ type ContainerManager interface {
|
||||
// GetCapacity returns the amount of compute resources tracked by container manager available on the node.
|
||||
GetCapacity() v1.ResourceList
|
||||
|
||||
// GetDevicePluginResourceCapacity returns the amount of device plugin resources available on the node
|
||||
// and inactive device plugin resources previously registered on the node.
|
||||
GetDevicePluginResourceCapacity() (v1.ResourceList, []string)
|
||||
|
||||
// UpdateQOSCgroups performs housekeeping updates to ensure that the top
|
||||
// level QoS containers have their desired state in a thread-safe way
|
||||
UpdateQOSCgroups() error
|
||||
|
@ -47,6 +47,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/deviceplugin"
|
||||
cmutil "k8s.io/kubernetes/pkg/kubelet/cm/util"
|
||||
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||
"k8s.io/kubernetes/pkg/kubelet/qos"
|
||||
@ -262,21 +263,9 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
|
||||
qosContainerManager: qosContainerManager,
|
||||
}
|
||||
|
||||
updateDeviceCapacityFunc := func(updates v1.ResourceList) {
|
||||
cm.Lock()
|
||||
defer cm.Unlock()
|
||||
for k, v := range updates {
|
||||
if v.Value() <= 0 {
|
||||
delete(cm.capacity, k)
|
||||
} else {
|
||||
cm.capacity[k] = v
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
glog.Infof("Creating device plugin manager: %t", devicePluginEnabled)
|
||||
if devicePluginEnabled {
|
||||
cm.devicePluginManager, err = deviceplugin.NewManagerImpl(updateDeviceCapacityFunc)
|
||||
cm.devicePluginManager, err = deviceplugin.NewManagerImpl()
|
||||
} else {
|
||||
cm.devicePluginManager, err = deviceplugin.NewManagerStub()
|
||||
}
|
||||
@ -530,6 +519,7 @@ func (cm *containerManagerImpl) Status() Status {
|
||||
|
||||
func (cm *containerManagerImpl) Start(node *v1.Node,
|
||||
activePods ActivePodsFunc,
|
||||
sourcesReady config.SourcesReady,
|
||||
podStatusProvider status.PodStatusProvider,
|
||||
runtimeService internalapi.RuntimeService) error {
|
||||
|
||||
@ -597,7 +587,7 @@ func (cm *containerManagerImpl) Start(node *v1.Node,
|
||||
}, time.Second, stopChan)
|
||||
|
||||
// Starts device plugin manager.
|
||||
if err := cm.devicePluginManager.Start(deviceplugin.ActivePodsFunc(activePods)); err != nil {
|
||||
if err := cm.devicePluginManager.Start(deviceplugin.ActivePodsFunc(activePods), sourcesReady); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
@ -896,3 +886,7 @@ func (cm *containerManagerImpl) GetCapacity() v1.ResourceList {
|
||||
defer cm.RUnlock()
|
||||
return cm.capacity
|
||||
}
|
||||
|
||||
func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceList, []string) {
|
||||
return cm.devicePluginManager.GetCapacity()
|
||||
}
|
||||
|
@ -22,6 +22,7 @@ import (
|
||||
|
||||
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
|
||||
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||
"k8s.io/kubernetes/pkg/kubelet/status"
|
||||
@ -32,7 +33,7 @@ type containerManagerStub struct{}
|
||||
|
||||
var _ ContainerManager = &containerManagerStub{}
|
||||
|
||||
func (cm *containerManagerStub) Start(_ *v1.Node, _ ActivePodsFunc, _ status.PodStatusProvider, _ internalapi.RuntimeService) error {
|
||||
func (cm *containerManagerStub) Start(_ *v1.Node, _ ActivePodsFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService) error {
|
||||
glog.V(2).Infof("Starting stub container manager")
|
||||
return nil
|
||||
}
|
||||
@ -69,6 +70,10 @@ func (cm *containerManagerStub) GetCapacity() v1.ResourceList {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cm *containerManagerStub) GetDevicePluginResourceCapacity() (v1.ResourceList, []string) {
|
||||
return nil, []string{}
|
||||
}
|
||||
|
||||
func (cm *containerManagerStub) NewPodContainerManager() PodContainerManager {
|
||||
return &podContainerManagerStub{}
|
||||
}
|
||||
|
@ -26,6 +26,7 @@ import (
|
||||
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
|
||||
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||
"k8s.io/kubernetes/pkg/kubelet/status"
|
||||
@ -38,7 +39,7 @@ type unsupportedContainerManager struct {
|
||||
|
||||
var _ ContainerManager = &unsupportedContainerManager{}
|
||||
|
||||
func (unsupportedContainerManager) Start(_ *v1.Node, _ ActivePodsFunc, _ status.PodStatusProvider, _ internalapi.RuntimeService) error {
|
||||
func (unsupportedContainerManager) Start(_ *v1.Node, _ ActivePodsFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService) error {
|
||||
return fmt.Errorf("Container Manager is unsupported in this build")
|
||||
}
|
||||
|
||||
@ -74,6 +75,10 @@ func (cm *unsupportedContainerManager) GetCapacity() v1.ResourceList {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cm *unsupportedContainerManager) GetDevicePluginResourceCapacity() (v1.ResourceList, []string) {
|
||||
return nil, []string{}
|
||||
}
|
||||
|
||||
func (cm *unsupportedContainerManager) NewPodContainerManager() PodContainerManager {
|
||||
return &unsupportedPodContainerManager{}
|
||||
}
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
"k8s.io/client-go/tools/record"
|
||||
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
|
||||
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||
"k8s.io/kubernetes/pkg/kubelet/status"
|
||||
"k8s.io/kubernetes/pkg/util/mount"
|
||||
)
|
||||
@ -35,7 +36,7 @@ type containerManagerImpl struct {
|
||||
|
||||
var _ ContainerManager = &containerManagerImpl{}
|
||||
|
||||
func (cm *containerManagerImpl) Start(_ *v1.Node, _ ActivePodsFunc, _ status.PodStatusProvider, _ internalapi.RuntimeService) error {
|
||||
func (cm *containerManagerImpl) Start(_ *v1.Node, _ ActivePodsFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService) error {
|
||||
glog.V(2).Infof("Starting Windows stub container manager")
|
||||
return nil
|
||||
}
|
||||
|
@ -20,6 +20,7 @@ go_library(
|
||||
deps = [
|
||||
"//pkg/apis/core/v1/helper:go_default_library",
|
||||
"//pkg/kubelet/apis/deviceplugin/v1alpha:go_default_library",
|
||||
"//pkg/kubelet/config:go_default_library",
|
||||
"//pkg/kubelet/container:go_default_library",
|
||||
"//pkg/kubelet/lifecycle:go_default_library",
|
||||
"//plugin/pkg/scheduler/schedulercache:go_default_library",
|
||||
|
@ -34,6 +34,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
|
||||
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
|
||||
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
||||
)
|
||||
@ -61,6 +62,10 @@ type ManagerImpl struct {
|
||||
// could be counted when updating allocated devices
|
||||
activePods ActivePodsFunc
|
||||
|
||||
// sourcesReady provides the readiness of kubelet configuration sources such as apiserver update readiness.
|
||||
// We use it to determine when we can purge inactive pods from checkpointed state.
|
||||
sourcesReady config.SourcesReady
|
||||
|
||||
// callback is used for updating devices' states in one time call.
|
||||
// e.g. a new device is advertised, two old devices are deleted and a running device fails.
|
||||
callback monitorCallback
|
||||
@ -75,13 +80,17 @@ type ManagerImpl struct {
|
||||
podDevices podDevices
|
||||
}
|
||||
|
||||
// NewManagerImpl creates a new manager. updateCapacityFunc is called to
|
||||
// update ContainerManager capacity when device capacity changes.
|
||||
func NewManagerImpl(updateCapacityFunc func(v1.ResourceList)) (*ManagerImpl, error) {
|
||||
return newManagerImpl(updateCapacityFunc, pluginapi.KubeletSocket)
|
||||
type sourcesReadyStub struct{}
|
||||
|
||||
func (s *sourcesReadyStub) AddSource(source string) {}
|
||||
func (s *sourcesReadyStub) AllReady() bool { return true }
|
||||
|
||||
// NewManagerImpl creates a new manager.
|
||||
func NewManagerImpl() (*ManagerImpl, error) {
|
||||
return newManagerImpl(pluginapi.KubeletSocket)
|
||||
}
|
||||
|
||||
func newManagerImpl(updateCapacityFunc func(v1.ResourceList), socketPath string) (*ManagerImpl, error) {
|
||||
func newManagerImpl(socketPath string) (*ManagerImpl, error) {
|
||||
glog.V(2).Infof("Creating Device Plugin manager at %s", socketPath)
|
||||
|
||||
if socketPath == "" || !filepath.IsAbs(socketPath) {
|
||||
@ -97,36 +106,38 @@ func newManagerImpl(updateCapacityFunc func(v1.ResourceList), socketPath string)
|
||||
allocatedDevices: make(map[string]sets.String),
|
||||
podDevices: make(podDevices),
|
||||
}
|
||||
manager.callback = manager.genericDeviceUpdateCallback
|
||||
|
||||
manager.callback = func(resourceName string, added, updated, deleted []pluginapi.Device) {
|
||||
var capacity = v1.ResourceList{}
|
||||
kept := append(updated, added...)
|
||||
|
||||
manager.mutex.Lock()
|
||||
defer manager.mutex.Unlock()
|
||||
|
||||
if _, ok := manager.allDevices[resourceName]; !ok {
|
||||
manager.allDevices[resourceName] = sets.NewString()
|
||||
}
|
||||
// For now, Manager only keeps track of healthy devices.
|
||||
// We can revisit this later when the need comes to track unhealthy devices here.
|
||||
for _, dev := range kept {
|
||||
if dev.Health == pluginapi.Healthy {
|
||||
manager.allDevices[resourceName].Insert(dev.ID)
|
||||
} else {
|
||||
manager.allDevices[resourceName].Delete(dev.ID)
|
||||
}
|
||||
}
|
||||
for _, dev := range deleted {
|
||||
manager.allDevices[resourceName].Delete(dev.ID)
|
||||
}
|
||||
capacity[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(manager.allDevices[resourceName].Len()), resource.DecimalSI)
|
||||
updateCapacityFunc(capacity)
|
||||
}
|
||||
// The following structs are populated with real implementations in manager.Start()
|
||||
// Before that, initializes them to perform no-op operations.
|
||||
manager.activePods = func() []*v1.Pod { return []*v1.Pod{} }
|
||||
manager.sourcesReady = &sourcesReadyStub{}
|
||||
|
||||
return manager, nil
|
||||
}
|
||||
|
||||
func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, added, updated, deleted []pluginapi.Device) {
|
||||
kept := append(updated, added...)
|
||||
m.mutex.Lock()
|
||||
if _, ok := m.allDevices[resourceName]; !ok {
|
||||
m.allDevices[resourceName] = sets.NewString()
|
||||
}
|
||||
// For now, Manager only keeps track of healthy devices.
|
||||
// TODO: adds support to track unhealthy devices.
|
||||
for _, dev := range kept {
|
||||
if dev.Health == pluginapi.Healthy {
|
||||
m.allDevices[resourceName].Insert(dev.ID)
|
||||
} else {
|
||||
m.allDevices[resourceName].Delete(dev.ID)
|
||||
}
|
||||
}
|
||||
for _, dev := range deleted {
|
||||
m.allDevices[resourceName].Delete(dev.ID)
|
||||
}
|
||||
m.mutex.Unlock()
|
||||
m.writeCheckpoint()
|
||||
}
|
||||
|
||||
func (m *ManagerImpl) removeContents(dir string) error {
|
||||
d, err := os.Open(dir)
|
||||
if err != nil {
|
||||
@ -171,10 +182,11 @@ func (m *ManagerImpl) checkpointFile() string {
|
||||
// Start starts the Device Plugin Manager amd start initialization of
|
||||
// podDevices and allocatedDevices information from checkpoint-ed state and
|
||||
// starts device plugin registration service.
|
||||
func (m *ManagerImpl) Start(activePods ActivePodsFunc) error {
|
||||
func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {
|
||||
glog.V(2).Infof("Starting Device Plugin manager")
|
||||
|
||||
m.activePods = activePods
|
||||
m.sourcesReady = sourcesReady
|
||||
|
||||
// Loads in allocatedDevices information from disk.
|
||||
err := m.readCheckpoint()
|
||||
@ -238,6 +250,9 @@ func (m *ManagerImpl) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.P
|
||||
}
|
||||
}
|
||||
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
|
||||
// quick return if no pluginResources requested
|
||||
if _, podRequireDevicePluginResource := m.podDevices[string(pod.UID)]; !podRequireDevicePluginResource {
|
||||
return nil
|
||||
@ -334,8 +349,6 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
|
||||
if old, ok := m.endpoints[r.ResourceName]; ok && old == e {
|
||||
glog.V(2).Infof("Delete resource for endpoint %v", e)
|
||||
delete(m.endpoints, r.ResourceName)
|
||||
// Issues callback to delete all of devices.
|
||||
e.callback(e.resourceName, []pluginapi.Device{}, []pluginapi.Device{}, e.getDevices())
|
||||
}
|
||||
|
||||
glog.V(2).Infof("Unregistered endpoint %v", e)
|
||||
@ -343,10 +356,56 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
|
||||
}()
|
||||
}
|
||||
|
||||
// GetCapacity is expected to be called when Kubelet updates its node status.
|
||||
// The first returned variable contains the registered device plugin resource capacity.
|
||||
// The second returned variable contains previously registered resources that are no longer active.
|
||||
// Kubelet uses this information to update resource capacity/allocatable in its node status.
|
||||
// After the call, device plugin can remove the inactive resources from its internal list as the
|
||||
// change is already reflected in Kubelet node status.
|
||||
// Note in the special case after Kubelet restarts, device plugin resource capacities can
|
||||
// temporarily drop to zero till corresponding device plugins re-register. This is OK because
|
||||
// cm.UpdatePluginResource() run during predicate Admit guarantees we adjust nodeinfo
|
||||
// capacity for already allocated pods so that they can continue to run. However, new pods
|
||||
// requiring device plugin resources will not be scheduled till device plugin re-registers.
|
||||
func (m *ManagerImpl) GetCapacity() (v1.ResourceList, []string) {
|
||||
needsUpdateCheckpoint := false
|
||||
var capacity = v1.ResourceList{}
|
||||
var deletedResources []string
|
||||
m.mutex.Lock()
|
||||
for resourceName, devices := range m.allDevices {
|
||||
if _, ok := m.endpoints[resourceName]; !ok {
|
||||
delete(m.allDevices, resourceName)
|
||||
deletedResources = append(deletedResources, resourceName)
|
||||
needsUpdateCheckpoint = true
|
||||
} else {
|
||||
capacity[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)
|
||||
}
|
||||
}
|
||||
m.mutex.Unlock()
|
||||
if needsUpdateCheckpoint {
|
||||
m.writeCheckpoint()
|
||||
}
|
||||
return capacity, deletedResources
|
||||
}
|
||||
|
||||
// checkpointData struct is used to store pod to device allocation information
|
||||
// and registered device information in a checkpoint file.
|
||||
// TODO: add version control when we need to change checkpoint format.
|
||||
type checkpointData struct {
|
||||
PodDeviceEntries []podDevicesCheckpointEntry
|
||||
RegisteredDevices map[string][]string
|
||||
}
|
||||
|
||||
// Checkpoints device to container allocation information to disk.
|
||||
func (m *ManagerImpl) writeCheckpoint() error {
|
||||
m.mutex.Lock()
|
||||
data := m.podDevices.toCheckpointData()
|
||||
data := checkpointData{
|
||||
PodDeviceEntries: m.podDevices.toCheckpointData(),
|
||||
RegisteredDevices: make(map[string][]string),
|
||||
}
|
||||
for resource, devices := range m.allDevices {
|
||||
data.RegisteredDevices[resource] = devices.UnsortedList()
|
||||
}
|
||||
m.mutex.Unlock()
|
||||
|
||||
dataJSON, err := json.Marshal(data)
|
||||
@ -373,14 +432,23 @@ func (m *ManagerImpl) readCheckpoint() error {
|
||||
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
m.podDevices.fromCheckpointData(data)
|
||||
m.podDevices.fromCheckpointData(data.PodDeviceEntries)
|
||||
m.allocatedDevices = m.podDevices.devices()
|
||||
for resource, devices := range data.RegisteredDevices {
|
||||
m.allDevices[resource] = sets.NewString()
|
||||
for _, dev := range devices {
|
||||
m.allDevices[resource].Insert(dev)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateAllocatedDevices gets a list of active pods and then frees any Devices that are bound to
|
||||
// terminated pods. Returns error on failure.
|
||||
func (m *ManagerImpl) updateAllocatedDevices(activePods []*v1.Pod) {
|
||||
if !m.sourcesReady.AllReady() {
|
||||
return
|
||||
}
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
activePodUids := sets.NewString()
|
||||
@ -392,7 +460,7 @@ func (m *ManagerImpl) updateAllocatedDevices(activePods []*v1.Pod) {
|
||||
if len(podsToBeRemoved) <= 0 {
|
||||
return
|
||||
}
|
||||
glog.V(5).Infof("pods to be removed: %v", podsToBeRemoved.List())
|
||||
glog.V(3).Infof("pods to be removed: %v", podsToBeRemoved.List())
|
||||
m.podDevices.delete(podsToBeRemoved.List())
|
||||
// Regenerated allocatedDevices after we update pod allocation information.
|
||||
m.allocatedDevices = m.podDevices.devices()
|
||||
@ -420,6 +488,11 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi
|
||||
// No change, no work.
|
||||
return nil, nil
|
||||
}
|
||||
glog.V(3).Infof("Needs to allocate %v %v for pod %q container %q", needed, resource, podUID, contName)
|
||||
// Needs to allocate additional devices.
|
||||
if _, ok := m.allDevices[resource]; !ok {
|
||||
return nil, fmt.Errorf("can't allocate unregistered device %v", resource)
|
||||
}
|
||||
devices = sets.NewString()
|
||||
// Needs to allocate additional devices.
|
||||
if m.allocatedDevices[resource] == nil {
|
||||
@ -455,7 +528,11 @@ func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Cont
|
||||
resource := string(k)
|
||||
needed := int(v.Value())
|
||||
glog.V(3).Infof("needs %d %s", needed, resource)
|
||||
if _, registeredResource := m.allDevices[resource]; !registeredResource {
|
||||
_, registeredResource := m.allDevices[resource]
|
||||
_, allocatedResource := m.allocatedDevices[resource]
|
||||
// Continues if this is neither an active device plugin resource nor
|
||||
// a resource we have previously allocated.
|
||||
if !registeredResource && !allocatedResource {
|
||||
continue
|
||||
}
|
||||
// Updates allocatedDevices to garbage collect any stranded resources
|
||||
|
@ -19,6 +19,7 @@ package deviceplugin
|
||||
import (
|
||||
"k8s.io/api/core/v1"
|
||||
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
|
||||
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
||||
)
|
||||
@ -32,7 +33,7 @@ func NewManagerStub() (*ManagerStub, error) {
|
||||
}
|
||||
|
||||
// Start simply returns nil.
|
||||
func (h *ManagerStub) Start(activePods ActivePodsFunc) error {
|
||||
func (h *ManagerStub) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -55,3 +56,8 @@ func (h *ManagerStub) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.P
|
||||
func (h *ManagerStub) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions {
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetCapacity simply returns nil capacity and empty removed resource list.
|
||||
func (h *ManagerStub) GetCapacity() (v1.ResourceList, []string) {
|
||||
return nil, []string{}
|
||||
}
|
||||
|
@ -43,8 +43,7 @@ const (
|
||||
)
|
||||
|
||||
func TestNewManagerImpl(t *testing.T) {
|
||||
verifyCapacityFunc := func(updates v1.ResourceList) {}
|
||||
_, err := newManagerImpl(verifyCapacityFunc, socketName)
|
||||
_, err := newManagerImpl(socketName)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
@ -123,8 +122,7 @@ func TestDevicePluginReRegistration(t *testing.T) {
|
||||
}
|
||||
|
||||
func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback) (Manager, *Stub) {
|
||||
updateCapacity := func(v1.ResourceList) {}
|
||||
m, err := newManagerImpl(updateCapacity, socketName)
|
||||
m, err := newManagerImpl(socketName)
|
||||
require.NoError(t, err)
|
||||
|
||||
m.callback = callback
|
||||
@ -132,7 +130,7 @@ func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback) (Ma
|
||||
activePods := func() []*v1.Pod {
|
||||
return []*v1.Pod{}
|
||||
}
|
||||
err = m.Start(activePods)
|
||||
err = m.Start(activePods, &sourcesReadyStub{})
|
||||
require.NoError(t, err)
|
||||
|
||||
p := NewDevicePluginStub(devs, pluginSocketName)
|
||||
@ -148,12 +146,8 @@ func cleanup(t *testing.T, m Manager, p *Stub) {
|
||||
}
|
||||
|
||||
func TestUpdateCapacity(t *testing.T) {
|
||||
var expected = v1.ResourceList{}
|
||||
testManager, err := newManagerImpl(socketName)
|
||||
as := assert.New(t)
|
||||
verifyCapacityFunc := func(updates v1.ResourceList) {
|
||||
as.Equal(expected, updates)
|
||||
}
|
||||
testManager, err := newManagerImpl(verifyCapacityFunc, socketName)
|
||||
as.NotNil(testManager)
|
||||
as.Nil(err)
|
||||
|
||||
@ -162,23 +156,68 @@ func TestUpdateCapacity(t *testing.T) {
|
||||
{ID: "Device2", Health: pluginapi.Healthy},
|
||||
{ID: "Device3", Health: pluginapi.Unhealthy},
|
||||
}
|
||||
callback := testManager.genericDeviceUpdateCallback
|
||||
|
||||
resourceName := "resource1"
|
||||
// Adds three devices for resource1, two healthy and one unhealthy.
|
||||
// Expects capacity for resource1 to be 2.
|
||||
expected[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(2), resource.DecimalSI)
|
||||
testManager.callback(resourceName, devs, []pluginapi.Device{}, []pluginapi.Device{})
|
||||
resourceName1 := "domain1.com/resource1"
|
||||
testManager.endpoints[resourceName1] = &endpointImpl{devices: make(map[string]pluginapi.Device)}
|
||||
callback(resourceName1, devs, []pluginapi.Device{}, []pluginapi.Device{})
|
||||
capacity, removedResources := testManager.GetCapacity()
|
||||
resource1Capacity, ok := capacity[v1.ResourceName(resourceName1)]
|
||||
as.True(ok)
|
||||
as.Equal(int64(2), resource1Capacity.Value())
|
||||
as.Equal(0, len(removedResources))
|
||||
|
||||
// Deletes an unhealthy device should NOT change capacity.
|
||||
testManager.callback(resourceName, []pluginapi.Device{}, []pluginapi.Device{}, []pluginapi.Device{devs[2]})
|
||||
callback(resourceName1, []pluginapi.Device{}, []pluginapi.Device{}, []pluginapi.Device{devs[2]})
|
||||
capacity, removedResources = testManager.GetCapacity()
|
||||
resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)]
|
||||
as.True(ok)
|
||||
as.Equal(int64(2), resource1Capacity.Value())
|
||||
as.Equal(0, len(removedResources))
|
||||
|
||||
// Updates a healthy device to unhealthy should reduce capacity by 1.
|
||||
expected[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(1), resource.DecimalSI)
|
||||
dev2 := devs[1]
|
||||
dev2.Health = pluginapi.Unhealthy
|
||||
callback(resourceName1, []pluginapi.Device{}, []pluginapi.Device{dev2}, []pluginapi.Device{})
|
||||
capacity, removedResources = testManager.GetCapacity()
|
||||
resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)]
|
||||
as.True(ok)
|
||||
as.Equal(int64(1), resource1Capacity.Value())
|
||||
as.Equal(0, len(removedResources))
|
||||
|
||||
// Deletes a healthy device should reduce capacity by 1.
|
||||
expected[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(0), resource.DecimalSI)
|
||||
callback(resourceName1, []pluginapi.Device{}, []pluginapi.Device{}, []pluginapi.Device{devs[0]})
|
||||
capacity, removedResources = testManager.GetCapacity()
|
||||
resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)]
|
||||
as.True(ok)
|
||||
as.Equal(int64(0), resource1Capacity.Value())
|
||||
as.Equal(0, len(removedResources))
|
||||
|
||||
// Tests adding another resource.
|
||||
delete(expected, v1.ResourceName(resourceName))
|
||||
resourceName2 := "resource2"
|
||||
expected[v1.ResourceName(resourceName2)] = *resource.NewQuantity(int64(2), resource.DecimalSI)
|
||||
testManager.callback(resourceName2, devs, []pluginapi.Device{}, []pluginapi.Device{})
|
||||
testManager.endpoints[resourceName2] = &endpointImpl{devices: make(map[string]pluginapi.Device)}
|
||||
callback(resourceName2, devs, []pluginapi.Device{}, []pluginapi.Device{})
|
||||
capacity, removedResources = testManager.GetCapacity()
|
||||
as.Equal(2, len(capacity))
|
||||
resource2Capacity, ok := capacity[v1.ResourceName(resourceName2)]
|
||||
as.True(ok)
|
||||
as.Equal(int64(2), resource2Capacity.Value())
|
||||
as.Equal(0, len(removedResources))
|
||||
|
||||
// Removes resourceName1 endpoint. Verifies testManager.GetCapacity() reports that resourceName1
|
||||
// is removed from capacity and it no longer exists in allDevices after the call.
|
||||
delete(testManager.endpoints, resourceName1)
|
||||
capacity, removed := testManager.GetCapacity()
|
||||
as.Equal([]string{resourceName1}, removed)
|
||||
_, ok = capacity[v1.ResourceName(resourceName1)]
|
||||
as.False(ok)
|
||||
val, ok := capacity[v1.ResourceName(resourceName2)]
|
||||
as.True(ok)
|
||||
as.Equal(int64(2), val.Value())
|
||||
_, ok = testManager.allDevices[resourceName1]
|
||||
as.False(ok)
|
||||
}
|
||||
|
||||
type stringPairType struct {
|
||||
@ -245,8 +284,19 @@ func TestCheckpoint(t *testing.T) {
|
||||
constructAllocResp(map[string]string{"/dev/r1dev4": "/dev/r1dev4"},
|
||||
map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{}))
|
||||
|
||||
testManager.allDevices[resourceName1] = sets.NewString()
|
||||
testManager.allDevices[resourceName1].Insert("dev1")
|
||||
testManager.allDevices[resourceName1].Insert("dev2")
|
||||
testManager.allDevices[resourceName1].Insert("dev3")
|
||||
testManager.allDevices[resourceName1].Insert("dev4")
|
||||
testManager.allDevices[resourceName1].Insert("dev5")
|
||||
testManager.allDevices[resourceName2] = sets.NewString()
|
||||
testManager.allDevices[resourceName2].Insert("dev1")
|
||||
testManager.allDevices[resourceName2].Insert("dev2")
|
||||
|
||||
expectedPodDevices := testManager.podDevices
|
||||
expectedAllocatedDevices := testManager.podDevices.devices()
|
||||
expectedAllDevices := testManager.allDevices
|
||||
|
||||
err := testManager.writeCheckpoint()
|
||||
as := assert.New(t)
|
||||
@ -272,6 +322,7 @@ func TestCheckpoint(t *testing.T) {
|
||||
}
|
||||
}
|
||||
as.True(reflect.DeepEqual(expectedAllocatedDevices, testManager.allocatedDevices))
|
||||
as.True(reflect.DeepEqual(expectedAllDevices, testManager.allDevices))
|
||||
}
|
||||
|
||||
type activePodsStub struct {
|
||||
@ -341,6 +392,7 @@ func TestPodContainerDeviceAllocation(t *testing.T) {
|
||||
endpoints: make(map[string]endpoint),
|
||||
podDevices: make(podDevices),
|
||||
activePods: podsStub.getActivePods,
|
||||
sourcesReady: &sourcesReadyStub{},
|
||||
}
|
||||
|
||||
testManager.allDevices[resourceName1] = sets.NewString()
|
||||
|
@ -94,7 +94,8 @@ func (pdev podDevices) devices() map[string]sets.String {
|
||||
return ret
|
||||
}
|
||||
|
||||
type checkpointEntry struct {
|
||||
// podDevicesCheckpointEntry is used to record <pod, container> to device allocation information.
|
||||
type podDevicesCheckpointEntry struct {
|
||||
PodUID string
|
||||
ContainerName string
|
||||
ResourceName string
|
||||
@ -102,16 +103,9 @@ type checkpointEntry struct {
|
||||
AllocResp []byte
|
||||
}
|
||||
|
||||
// checkpointData struct is used to store pod to device allocation information
|
||||
// in a checkpoint file.
|
||||
// TODO: add version control when we need to change checkpoint format.
|
||||
type checkpointData struct {
|
||||
Entries []checkpointEntry
|
||||
}
|
||||
|
||||
// Turns podDevices to checkpointData.
|
||||
func (pdev podDevices) toCheckpointData() checkpointData {
|
||||
var data checkpointData
|
||||
func (pdev podDevices) toCheckpointData() []podDevicesCheckpointEntry {
|
||||
var data []podDevicesCheckpointEntry
|
||||
for podUID, containerDevices := range pdev {
|
||||
for conName, resources := range containerDevices {
|
||||
for resource, devices := range resources {
|
||||
@ -126,7 +120,7 @@ func (pdev podDevices) toCheckpointData() checkpointData {
|
||||
glog.Errorf("Can't marshal allocResp for %v %v %v: %v", podUID, conName, resource, err)
|
||||
continue
|
||||
}
|
||||
data.Entries = append(data.Entries, checkpointEntry{podUID, conName, resource, devIds, allocResp})
|
||||
data = append(data, podDevicesCheckpointEntry{podUID, conName, resource, devIds, allocResp})
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -134,8 +128,8 @@ func (pdev podDevices) toCheckpointData() checkpointData {
|
||||
}
|
||||
|
||||
// Populates podDevices from the passed in checkpointData.
|
||||
func (pdev podDevices) fromCheckpointData(data checkpointData) {
|
||||
for _, entry := range data.Entries {
|
||||
func (pdev podDevices) fromCheckpointData(data []podDevicesCheckpointEntry) {
|
||||
for _, entry := range data {
|
||||
glog.V(2).Infof("Get checkpoint entry: %v %v %v %v %v\n",
|
||||
entry.PodUID, entry.ContainerName, entry.ResourceName, entry.DeviceIDs, entry.AllocResp)
|
||||
devIDs := sets.NewString()
|
||||
|
@ -19,6 +19,7 @@ package deviceplugin
|
||||
import (
|
||||
"k8s.io/api/core/v1"
|
||||
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
|
||||
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
||||
@ -27,7 +28,7 @@ import (
|
||||
// Manager manages all the Device Plugins running on a node.
|
||||
type Manager interface {
|
||||
// Start starts device plugin registration service.
|
||||
Start(activePods ActivePodsFunc) error
|
||||
Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error
|
||||
|
||||
// Devices is the map of devices that have registered themselves
|
||||
// against the manager.
|
||||
@ -51,6 +52,10 @@ type Manager interface {
|
||||
// for the passed-in <pod, container> and returns its DeviceRunContainerOptions
|
||||
// for the found one. An empty struct is returned in case no cached state is found.
|
||||
GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions
|
||||
|
||||
// GetCapacity returns the amount of available device plugin resource capacity
|
||||
// and inactive device plugin resources previously registered on the node.
|
||||
GetCapacity() (v1.ResourceList, []string)
|
||||
}
|
||||
|
||||
// DeviceRunContainerOptions contains the combined container runtime settings to consume its allocated devices.
|
||||
|
@ -1304,7 +1304,7 @@ func (kl *Kubelet) initializeModules() error {
|
||||
return fmt.Errorf("Kubelet failed to get node info: %v", err)
|
||||
}
|
||||
|
||||
if err := kl.containerManager.Start(node, kl.GetActivePods, kl.statusManager, kl.runtimeService); err != nil {
|
||||
if err := kl.containerManager.Start(node, kl.GetActivePods, kl.sourcesReady, kl.statusManager, kl.runtimeService); err != nil {
|
||||
return fmt.Errorf("Failed to start ContainerManager %v", err)
|
||||
}
|
||||
|
||||
|
@ -601,15 +601,17 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) {
|
||||
}
|
||||
}
|
||||
|
||||
currentCapacity := kl.containerManager.GetCapacity()
|
||||
if currentCapacity != nil {
|
||||
for k, v := range currentCapacity {
|
||||
if v1helper.IsExtendedResourceName(k) {
|
||||
glog.V(2).Infof("Update capacity for %s to %d", k, v.Value())
|
||||
node.Status.Capacity[k] = v
|
||||
}
|
||||
devicePluginCapacity, removedDevicePlugins := kl.containerManager.GetDevicePluginResourceCapacity()
|
||||
if devicePluginCapacity != nil {
|
||||
for k, v := range devicePluginCapacity {
|
||||
glog.V(2).Infof("Update capacity for %s to %d", k, v.Value())
|
||||
node.Status.Capacity[k] = v
|
||||
}
|
||||
}
|
||||
for _, removedResource := range removedDevicePlugins {
|
||||
glog.V(2).Infof("Remove capacity for %s", removedResource)
|
||||
delete(node.Status.Capacity, v1.ResourceName(removedResource))
|
||||
}
|
||||
}
|
||||
|
||||
// Set Allocatable.
|
||||
|
@ -18,7 +18,6 @@ package e2e_node
|
||||
|
||||
import (
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"time"
|
||||
|
||||
@ -49,6 +48,7 @@ var _ = framework.KubeDescribe("NVIDIA GPU Device Plugin [Feature:GPUDevicePlugi
|
||||
initialConfig.FeatureGates[string(features.DevicePlugins)] = true
|
||||
})
|
||||
|
||||
var devicePluginPod *v1.Pod
|
||||
BeforeEach(func() {
|
||||
By("Ensuring that Nvidia GPUs exists on the node")
|
||||
if !checkIfNvidiaGPUsExistOnNode() {
|
||||
@ -56,7 +56,7 @@ var _ = framework.KubeDescribe("NVIDIA GPU Device Plugin [Feature:GPUDevicePlugi
|
||||
}
|
||||
|
||||
By("Creating the Google Device Plugin pod for NVIDIA GPU in GKE")
|
||||
f.PodClient().CreateSync(framework.NVIDIADevicePlugin(f.Namespace.Name))
|
||||
devicePluginPod = f.PodClient().CreateSync(framework.NVIDIADevicePlugin(f.Namespace.Name))
|
||||
|
||||
By("Waiting for GPUs to become available on the local node")
|
||||
Eventually(func() bool {
|
||||
@ -84,7 +84,7 @@ var _ = framework.KubeDescribe("NVIDIA GPU Device Plugin [Feature:GPUDevicePlugi
|
||||
It("checks that when Kubelet restarts exclusive GPU assignation to pods is kept.", func() {
|
||||
By("Creating one GPU pod on a node with at least two GPUs")
|
||||
p1 := f.PodClient().CreateSync(makeCudaPauseImage())
|
||||
devId1 := getDeviceId(f, p1.Name, p1.Name, 1)
|
||||
count1, devId1 := getDeviceId(f, p1.Name, p1.Name, 1)
|
||||
p1, err := f.PodClient().Get(p1.Name, metav1.GetOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
@ -92,17 +92,36 @@ var _ = framework.KubeDescribe("NVIDIA GPU Device Plugin [Feature:GPUDevicePlugi
|
||||
restartKubelet(f)
|
||||
|
||||
By("Confirming that after a kubelet and pod restart, GPU assignement is kept")
|
||||
devIdRestart := getDeviceId(f, p1.Name, p1.Name, 2)
|
||||
Expect(devIdRestart).To(Equal(devId1))
|
||||
count1, devIdRestart1 := getDeviceId(f, p1.Name, p1.Name, count1+1)
|
||||
Expect(devIdRestart1).To(Equal(devId1))
|
||||
|
||||
By("Restarting Kubelet and creating another pod")
|
||||
restartKubelet(f)
|
||||
p2 := f.PodClient().CreateSync(makeCudaPauseImage())
|
||||
|
||||
By("Checking that pods got a different GPU")
|
||||
devId2 := getDeviceId(f, p2.Name, p2.Name, 1)
|
||||
count2, devId2 := getDeviceId(f, p2.Name, p2.Name, 1)
|
||||
Expect(devId1).To(Not(Equal(devId2)))
|
||||
|
||||
By("Deleting device plugin.")
|
||||
f.PodClient().Delete(devicePluginPod.Name, &metav1.DeleteOptions{})
|
||||
By("Waiting for GPUs to become unavailable on the local node")
|
||||
Eventually(func() bool {
|
||||
return framework.NumberOfNVIDIAGPUs(getLocalNode(f)) <= 0
|
||||
}, 10*time.Minute, framework.Poll).Should(BeTrue())
|
||||
By("Checking that scheduled pods can continue to run even after we delete device plugin.")
|
||||
count1, devIdRestart1 = getDeviceId(f, p1.Name, p1.Name, count1+1)
|
||||
Expect(devIdRestart1).To(Equal(devId1))
|
||||
count2, devIdRestart2 := getDeviceId(f, p2.Name, p2.Name, count2+1)
|
||||
Expect(devIdRestart2).To(Equal(devId2))
|
||||
By("Restarting Kubelet.")
|
||||
restartKubelet(f)
|
||||
By("Checking that scheduled pods can continue to run even after we delete device plugin and restart Kubelet.")
|
||||
count1, devIdRestart1 = getDeviceId(f, p1.Name, p1.Name, count1+2)
|
||||
Expect(devIdRestart1).To(Equal(devId1))
|
||||
count2, devIdRestart2 = getDeviceId(f, p2.Name, p2.Name, count2+2)
|
||||
Expect(devIdRestart2).To(Equal(devId2))
|
||||
|
||||
// Cleanup
|
||||
f.PodClient().DeleteSync(p1.Name, &metav1.DeleteOptions{}, framework.DefaultPodDeletionTimeout)
|
||||
f.PodClient().DeleteSync(p2.Name, &metav1.DeleteOptions{}, framework.DefaultPodDeletionTimeout)
|
||||
@ -140,9 +159,6 @@ func newDecimalResourceList(name v1.ResourceName, quantity int64) v1.ResourceLis
|
||||
|
||||
// TODO: Find a uniform way to deal with systemctl/initctl/service operations. #34494
|
||||
func restartKubelet(f *framework.Framework) {
|
||||
beforeSocks, err := filepath.Glob("/var/lib/kubelet/device-plugins/nvidiaGPU*.sock")
|
||||
framework.ExpectNoError(err)
|
||||
Expect(len(beforeSocks)).NotTo(BeZero())
|
||||
stdout, err := exec.Command("sudo", "systemctl", "list-units", "kubelet*", "--state=running").CombinedOutput()
|
||||
framework.ExpectNoError(err)
|
||||
regex := regexp.MustCompile("(kubelet-[0-9]+)")
|
||||
@ -152,19 +168,18 @@ func restartKubelet(f *framework.Framework) {
|
||||
framework.Logf("Get running kubelet with systemctl: %v, %v", string(stdout), kube)
|
||||
stdout, err = exec.Command("sudo", "systemctl", "restart", kube).CombinedOutput()
|
||||
framework.ExpectNoError(err, "Failed to restart kubelet with systemctl: %v, %v", err, stdout)
|
||||
Eventually(func() ([]string, error) {
|
||||
return filepath.Glob("/var/lib/kubelet/device-plugins/nvidiaGPU*.sock")
|
||||
}, 5*time.Minute, framework.Poll).ShouldNot(ConsistOf(beforeSocks))
|
||||
}
|
||||
|
||||
func getDeviceId(f *framework.Framework, podName string, contName string, restartCount int32) string {
|
||||
func getDeviceId(f *framework.Framework, podName string, contName string, restartCount int32) (int32, string) {
|
||||
var count int32
|
||||
// Wait till pod has been restarted at least restartCount times.
|
||||
Eventually(func() bool {
|
||||
p, err := f.PodClient().Get(podName, metav1.GetOptions{})
|
||||
if err != nil || len(p.Status.ContainerStatuses) < 1 {
|
||||
return false
|
||||
}
|
||||
return p.Status.ContainerStatuses[0].RestartCount >= restartCount
|
||||
count = p.Status.ContainerStatuses[0].RestartCount
|
||||
return count >= restartCount
|
||||
}, 5*time.Minute, framework.Poll).Should(BeTrue())
|
||||
logs, err := framework.GetPodLogs(f.ClientSet, f.Namespace.Name, podName, contName)
|
||||
if err != nil {
|
||||
@ -174,7 +189,7 @@ func getDeviceId(f *framework.Framework, podName string, contName string, restar
|
||||
regex := regexp.MustCompile("gpu devices: (nvidia[0-9]+)")
|
||||
matches := regex.FindStringSubmatch(logs)
|
||||
if len(matches) < 2 {
|
||||
return ""
|
||||
return count, ""
|
||||
}
|
||||
return matches[1]
|
||||
return count, matches[1]
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user