Merge pull request #57266 from vikaschoudhary16/unhealthy_device

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Handle Unhealthy devices

Update node capacity with sum of both healthy and unhealthy devices.
Node allocatable reflect only healthy devices.



**What this PR does / why we need it**:
Currently node capacity only reflects healthy devices. Unhealthy devices are ignored totally while updating node status. This PR handles unhealthy devices while updating node status. 

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #57241

**Special notes for your reviewer**:

**Release note**:
<!--  Write your release note:
Handle Unhealthy devices

```release-note
Handle Unhealthy devices
```
/cc @tengqm @ConnorDoyle @jiayingz @vishh @jeremyeder @sjenning @resouer @ScorpioCPH @lichuqiang @RenaudWasTaken @balajismaniam 

/sig node
This commit is contained in:
Kubernetes Submit Queue 2018-01-12 19:55:54 -08:00 committed by GitHub
commit f2e46a2147
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 128 additions and 64 deletions

View File

@ -70,9 +70,10 @@ type ContainerManager interface {
// GetCapacity returns the amount of compute resources tracked by container manager available on the node.
GetCapacity() v1.ResourceList
// GetDevicePluginResourceCapacity returns the amount of device plugin resources available on the node
// GetDevicePluginResourceCapacity returns the node capacity (amount of total device plugin resources),
// node allocatable (amount of total healthy resources reported by device plugin),
// and inactive device plugin resources previously registered on the node.
GetDevicePluginResourceCapacity() (v1.ResourceList, []string)
GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string)
// UpdateQOSCgroups performs housekeeping updates to ensure that the top
// level QoS containers have their desired state in a thread-safe way

View File

@ -887,6 +887,6 @@ func (cm *containerManagerImpl) GetCapacity() v1.ResourceList {
return cm.capacity
}
func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceList, []string) {
func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) {
return cm.devicePluginManager.GetCapacity()
}

View File

@ -70,8 +70,8 @@ func (cm *containerManagerStub) GetCapacity() v1.ResourceList {
return nil
}
func (cm *containerManagerStub) GetDevicePluginResourceCapacity() (v1.ResourceList, []string) {
return nil, []string{}
func (cm *containerManagerStub) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) {
return nil, nil, []string{}
}
func (cm *containerManagerStub) NewPodContainerManager() PodContainerManager {

View File

@ -73,8 +73,11 @@ type ManagerImpl struct {
// e.g. a new device is advertised, two old devices are deleted and a running device fails.
callback monitorCallback
// allDevices contains all of registered resourceNames and their exported device IDs.
allDevices map[string]sets.String
// healthyDevices contains all of the registered healthy resourceNames and their exported device IDs.
healthyDevices map[string]sets.String
// unhealthyDevices contains all of the unhealthy devices and their exported device IDs.
unhealthyDevices map[string]sets.String
// allocatedDevices contains allocated deviceIds, keyed by resourceName.
allocatedDevices map[string]sets.String
@ -106,7 +109,8 @@ func newManagerImpl(socketPath string) (*ManagerImpl, error) {
endpoints: make(map[string]endpoint),
socketname: file,
socketdir: dir,
allDevices: make(map[string]sets.String),
healthyDevices: make(map[string]sets.String),
unhealthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
podDevices: make(podDevices),
}
@ -128,20 +132,24 @@ func newManagerImpl(socketPath string) (*ManagerImpl, error) {
func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, added, updated, deleted []pluginapi.Device) {
kept := append(updated, added...)
m.mutex.Lock()
if _, ok := m.allDevices[resourceName]; !ok {
m.allDevices[resourceName] = sets.NewString()
if _, ok := m.healthyDevices[resourceName]; !ok {
m.healthyDevices[resourceName] = sets.NewString()
}
if _, ok := m.unhealthyDevices[resourceName]; !ok {
m.unhealthyDevices[resourceName] = sets.NewString()
}
// For now, Manager only keeps track of healthy devices.
// TODO: adds support to track unhealthy devices.
for _, dev := range kept {
if dev.Health == pluginapi.Healthy {
m.allDevices[resourceName].Insert(dev.ID)
m.healthyDevices[resourceName].Insert(dev.ID)
m.unhealthyDevices[resourceName].Delete(dev.ID)
} else {
m.allDevices[resourceName].Delete(dev.ID)
m.unhealthyDevices[resourceName].Insert(dev.ID)
m.healthyDevices[resourceName].Delete(dev.ID)
}
}
for _, dev := range deleted {
m.allDevices[resourceName].Delete(dev.ID)
m.healthyDevices[resourceName].Delete(dev.ID)
m.unhealthyDevices[resourceName].Delete(dev.ID)
}
m.mutex.Unlock()
m.writeCheckpoint()
@ -371,7 +379,8 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
// GetCapacity is expected to be called when Kubelet updates its node status.
// The first returned variable contains the registered device plugin resource capacity.
// The second returned variable contains previously registered resources that are no longer active.
// The second returned variable contains the registered device plugin resource allocatable.
// The third returned variable contains previously registered resources that are no longer active.
// Kubelet uses this information to update resource capacity/allocatable in its node status.
// After the call, device plugin can remove the inactive resources from its internal list as the
// change is already reflected in Kubelet node status.
@ -380,25 +389,47 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
// cm.UpdatePluginResource() run during predicate Admit guarantees we adjust nodeinfo
// capacity for already allocated pods so that they can continue to run. However, new pods
// requiring device plugin resources will not be scheduled till device plugin re-registers.
func (m *ManagerImpl) GetCapacity() (v1.ResourceList, []string) {
func (m *ManagerImpl) GetCapacity() (v1.ResourceList, v1.ResourceList, []string) {
needsUpdateCheckpoint := false
var capacity = v1.ResourceList{}
var allocatable = v1.ResourceList{}
var deletedResources []string
m.mutex.Lock()
for resourceName, devices := range m.allDevices {
for resourceName, devices := range m.healthyDevices {
if _, ok := m.endpoints[resourceName]; !ok {
delete(m.allDevices, resourceName)
delete(m.healthyDevices, resourceName)
deletedResources = append(deletedResources, resourceName)
needsUpdateCheckpoint = true
} else {
capacity[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)
allocatable[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)
}
}
for resourceName, devices := range m.unhealthyDevices {
if _, ok := m.endpoints[resourceName]; !ok {
delete(m.unhealthyDevices, resourceName)
alreadyDeleted := false
for _, name := range deletedResources {
if name == resourceName {
alreadyDeleted = true
}
}
if !alreadyDeleted {
deletedResources = append(deletedResources, resourceName)
}
needsUpdateCheckpoint = true
} else {
capacityCount := capacity[v1.ResourceName(resourceName)]
unhealthyCount := *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)
capacityCount.Add(unhealthyCount)
capacity[v1.ResourceName(resourceName)] = capacityCount
}
}
m.mutex.Unlock()
if needsUpdateCheckpoint {
m.writeCheckpoint()
}
return capacity, deletedResources
return capacity, allocatable, deletedResources
}
// checkpointData struct is used to store pod to device allocation information
@ -416,7 +447,7 @@ func (m *ManagerImpl) writeCheckpoint() error {
PodDeviceEntries: m.podDevices.toCheckpointData(),
RegisteredDevices: make(map[string][]string),
}
for resource, devices := range m.allDevices {
for resource, devices := range m.healthyDevices {
data.RegisteredDevices[resource] = devices.UnsortedList()
}
m.mutex.Unlock()
@ -453,9 +484,10 @@ func (m *ManagerImpl) readCheckpoint() error {
m.podDevices.fromCheckpointData(data.PodDeviceEntries)
m.allocatedDevices = m.podDevices.devices()
for resource, devices := range data.RegisteredDevices {
m.allDevices[resource] = sets.NewString()
// TODO: Support Checkpointing for unhealthy devices as well
m.healthyDevices[resource] = sets.NewString()
for _, dev := range devices {
m.allDevices[resource].Insert(dev)
m.healthyDevices[resource].Insert(dev)
}
}
return nil
@ -508,7 +540,7 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi
}
glog.V(3).Infof("Needs to allocate %v %v for pod %q container %q", needed, resource, podUID, contName)
// Needs to allocate additional devices.
if _, ok := m.allDevices[resource]; !ok {
if _, ok := m.healthyDevices[resource]; !ok {
return nil, fmt.Errorf("can't allocate unregistered device %v", resource)
}
devices = sets.NewString()
@ -527,7 +559,7 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi
// Gets Devices in use.
devicesInUse := m.allocatedDevices[resource]
// Gets a list of available devices.
available := m.allDevices[resource].Difference(devicesInUse)
available := m.healthyDevices[resource].Difference(devicesInUse)
if int(available.Len()) < needed {
return nil, fmt.Errorf("requested number of devices unavailable for %s. Requested: %d, Available: %d", resource, needed, available.Len())
}
@ -558,7 +590,7 @@ func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Cont
resource := string(k)
needed := int(v.Value())
glog.V(3).Infof("needs %d %s", needed, resource)
_, registeredResource := m.allDevices[resource]
_, registeredResource := m.healthyDevices[resource]
_, allocatedResource := m.allocatedDevices[resource]
// Continues if this is neither an active device plugin resource nor
// a resource we have previously allocated.

View File

@ -58,6 +58,6 @@ func (h *ManagerStub) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Co
}
// GetCapacity simply returns nil capacity and empty removed resource list.
func (h *ManagerStub) GetCapacity() (v1.ResourceList, []string) {
return nil, []string{}
func (h *ManagerStub) GetCapacity() (v1.ResourceList, v1.ResourceList, []string) {
return nil, nil, []string{}
}

View File

@ -138,7 +138,7 @@ func cleanup(t *testing.T, m Manager, p *Stub) {
m.Stop()
}
func TestUpdateCapacity(t *testing.T) {
func TestUpdateCapacityAllocatable(t *testing.T) {
testManager, err := newManagerImpl(socketName)
as := assert.New(t)
as.NotNil(testManager)
@ -156,61 +156,81 @@ func TestUpdateCapacity(t *testing.T) {
resourceName1 := "domain1.com/resource1"
testManager.endpoints[resourceName1] = &endpointImpl{devices: make(map[string]pluginapi.Device)}
callback(resourceName1, devs, []pluginapi.Device{}, []pluginapi.Device{})
capacity, removedResources := testManager.GetCapacity()
capacity, allocatable, removedResources := testManager.GetCapacity()
resource1Capacity, ok := capacity[v1.ResourceName(resourceName1)]
as.True(ok)
as.Equal(int64(2), resource1Capacity.Value())
resource1Allocatable, ok := allocatable[v1.ResourceName(resourceName1)]
as.True(ok)
as.Equal(int64(3), resource1Capacity.Value())
as.Equal(int64(2), resource1Allocatable.Value())
as.Equal(0, len(removedResources))
// Deletes an unhealthy device should NOT change capacity.
// Deletes an unhealthy device should NOT change allocatable but change capacity.
callback(resourceName1, []pluginapi.Device{}, []pluginapi.Device{}, []pluginapi.Device{devs[2]})
capacity, removedResources = testManager.GetCapacity()
capacity, allocatable, removedResources = testManager.GetCapacity()
resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)]
as.True(ok)
resource1Allocatable, ok = allocatable[v1.ResourceName(resourceName1)]
as.True(ok)
as.Equal(int64(2), resource1Capacity.Value())
as.Equal(int64(2), resource1Allocatable.Value())
as.Equal(0, len(removedResources))
// Updates a healthy device to unhealthy should reduce capacity by 1.
// Updates a healthy device to unhealthy should reduce allocatable by 1.
dev2 := devs[1]
dev2.Health = pluginapi.Unhealthy
callback(resourceName1, []pluginapi.Device{}, []pluginapi.Device{dev2}, []pluginapi.Device{})
capacity, removedResources = testManager.GetCapacity()
capacity, allocatable, removedResources = testManager.GetCapacity()
resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)]
as.True(ok)
as.Equal(int64(1), resource1Capacity.Value())
resource1Allocatable, ok = allocatable[v1.ResourceName(resourceName1)]
as.True(ok)
as.Equal(int64(2), resource1Capacity.Value())
as.Equal(int64(1), resource1Allocatable.Value())
as.Equal(0, len(removedResources))
// Deletes a healthy device should reduce capacity by 1.
// Deletes a healthy device should reduce capacity and allocatable by 1.
callback(resourceName1, []pluginapi.Device{}, []pluginapi.Device{}, []pluginapi.Device{devs[0]})
capacity, removedResources = testManager.GetCapacity()
capacity, allocatable, removedResources = testManager.GetCapacity()
resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)]
as.True(ok)
as.Equal(int64(0), resource1Capacity.Value())
resource1Allocatable, ok = allocatable[v1.ResourceName(resourceName1)]
as.True(ok)
as.Equal(int64(0), resource1Allocatable.Value())
as.Equal(int64(1), resource1Capacity.Value())
as.Equal(0, len(removedResources))
// Tests adding another resource.
resourceName2 := "resource2"
testManager.endpoints[resourceName2] = &endpointImpl{devices: make(map[string]pluginapi.Device)}
callback(resourceName2, devs, []pluginapi.Device{}, []pluginapi.Device{})
capacity, removedResources = testManager.GetCapacity()
capacity, allocatable, removedResources = testManager.GetCapacity()
as.Equal(2, len(capacity))
resource2Capacity, ok := capacity[v1.ResourceName(resourceName2)]
as.True(ok)
as.Equal(int64(2), resource2Capacity.Value())
resource2Allocatable, ok := allocatable[v1.ResourceName(resourceName2)]
as.True(ok)
as.Equal(int64(3), resource2Capacity.Value())
as.Equal(int64(2), resource2Allocatable.Value())
as.Equal(0, len(removedResources))
// Removes resourceName1 endpoint. Verifies testManager.GetCapacity() reports that resourceName1
// is removed from capacity and it no longer exists in allDevices after the call.
// is removed from capacity and it no longer exists in healthyDevices after the call.
delete(testManager.endpoints, resourceName1)
capacity, removed := testManager.GetCapacity()
capacity, allocatable, removed := testManager.GetCapacity()
as.Equal([]string{resourceName1}, removed)
_, ok = capacity[v1.ResourceName(resourceName1)]
as.False(ok)
val, ok := capacity[v1.ResourceName(resourceName2)]
as.True(ok)
as.Equal(int64(2), val.Value())
_, ok = testManager.allDevices[resourceName1]
as.Equal(int64(3), val.Value())
_, ok = testManager.healthyDevices[resourceName1]
as.False(ok)
_, ok = testManager.unhealthyDevices[resourceName1]
as.False(ok)
fmt.Println("removed: ", removed)
as.Equal(1, len(removed))
}
type stringPairType struct {
@ -259,7 +279,7 @@ func TestCheckpoint(t *testing.T) {
defer os.RemoveAll(tmpDir)
testManager := &ManagerImpl{
socketdir: tmpDir,
allDevices: make(map[string]sets.String),
healthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
podDevices: make(podDevices),
}
@ -283,19 +303,19 @@ func TestCheckpoint(t *testing.T) {
constructAllocResp(map[string]string{"/dev/r1dev4": "/dev/r1dev4"},
map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{}))
testManager.allDevices[resourceName1] = sets.NewString()
testManager.allDevices[resourceName1].Insert("dev1")
testManager.allDevices[resourceName1].Insert("dev2")
testManager.allDevices[resourceName1].Insert("dev3")
testManager.allDevices[resourceName1].Insert("dev4")
testManager.allDevices[resourceName1].Insert("dev5")
testManager.allDevices[resourceName2] = sets.NewString()
testManager.allDevices[resourceName2].Insert("dev1")
testManager.allDevices[resourceName2].Insert("dev2")
testManager.healthyDevices[resourceName1] = sets.NewString()
testManager.healthyDevices[resourceName1].Insert("dev1")
testManager.healthyDevices[resourceName1].Insert("dev2")
testManager.healthyDevices[resourceName1].Insert("dev3")
testManager.healthyDevices[resourceName1].Insert("dev4")
testManager.healthyDevices[resourceName1].Insert("dev5")
testManager.healthyDevices[resourceName2] = sets.NewString()
testManager.healthyDevices[resourceName2].Insert("dev1")
testManager.healthyDevices[resourceName2].Insert("dev2")
expectedPodDevices := testManager.podDevices
expectedAllocatedDevices := testManager.podDevices.devices()
expectedAllDevices := testManager.allDevices
expectedAllDevices := testManager.healthyDevices
err = testManager.writeCheckpoint()
@ -320,7 +340,7 @@ func TestCheckpoint(t *testing.T) {
}
}
as.True(reflect.DeepEqual(expectedAllocatedDevices, testManager.allocatedDevices))
as.True(reflect.DeepEqual(expectedAllDevices, testManager.allDevices))
as.True(reflect.DeepEqual(expectedAllDevices, testManager.healthyDevices))
}
type activePodsStub struct {
@ -377,7 +397,7 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso
testManager := &ManagerImpl{
socketdir: tmpDir,
callback: monitorCallback,
allDevices: make(map[string]sets.String),
healthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
endpoints: make(map[string]endpoint),
podDevices: make(podDevices),
@ -386,9 +406,9 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso
}
testManager.store, _ = utilstore.NewFileStore("/tmp/", utilfs.DefaultFs{})
for _, res := range testRes {
testManager.allDevices[res.resourceName] = sets.NewString()
testManager.healthyDevices[res.resourceName] = sets.NewString()
for _, dev := range res.devs {
testManager.allDevices[res.resourceName].Insert(dev)
testManager.healthyDevices[res.resourceName].Insert(dev)
}
if res.resourceName == "domain1.com/resource1" {
testManager.endpoints[res.resourceName] = &MockEndpoint{
@ -675,7 +695,7 @@ func TestSanitizeNodeAllocatable(t *testing.T) {
testManager := &ManagerImpl{
callback: monitorCallback,
allDevices: make(map[string]sets.String),
healthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
podDevices: make(podDevices),
}

View File

@ -53,9 +53,9 @@ type Manager interface {
// for the found one. An empty struct is returned in case no cached state is found.
GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions
// GetCapacity returns the amount of available device plugin resource capacity
// GetCapacity returns the amount of available device plugin resource capacity, resource allocatable
// and inactive device plugin resources previously registered on the node.
GetCapacity() (v1.ResourceList, []string)
GetCapacity() (v1.ResourceList, v1.ResourceList, []string)
}
// DeviceRunContainerOptions contains the combined container runtime settings to consume its allocated devices.

View File

@ -548,6 +548,10 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) {
}
}
var devicePluginAllocatable v1.ResourceList
var devicePluginCapacity v1.ResourceList
var removedDevicePlugins []string
// TODO: Post NotReady if we cannot get MachineInfo from cAdvisor. This needs to start
// cAdvisor locally, e.g. for test-cmd.sh, and in integration test.
info, err := kl.GetCachedMachineInfo()
@ -592,13 +596,14 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) {
}
}
devicePluginCapacity, removedDevicePlugins := kl.containerManager.GetDevicePluginResourceCapacity()
devicePluginCapacity, devicePluginAllocatable, removedDevicePlugins = kl.containerManager.GetDevicePluginResourceCapacity()
if devicePluginCapacity != nil {
for k, v := range devicePluginCapacity {
glog.V(2).Infof("Update capacity for %s to %d", k, v.Value())
node.Status.Capacity[k] = v
}
}
for _, removedResource := range removedDevicePlugins {
glog.V(2).Infof("Remove capacity for %s", removedResource)
delete(node.Status.Capacity, v1.ResourceName(removedResource))
@ -629,6 +634,12 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) {
}
node.Status.Allocatable[k] = value
}
if devicePluginAllocatable != nil {
for k, v := range devicePluginAllocatable {
glog.V(2).Infof("Update allocatable for %s to %d", k, v.Value())
node.Status.Allocatable[k] = v
}
}
// for every huge page reservation, we need to remove it from allocatable memory
for k, v := range node.Status.Capacity {
if v1helper.IsHugePageResourceName(k) {