Handle Unhealthy devices

Update node capacity with sum of both healthy and unhealthy devices.
Node allocatable reflect only healthy devices.
This commit is contained in:
vikaschoudhary16 2017-12-16 01:38:46 -05:00
parent 8d24ce1137
commit e9cf3f1ac4
8 changed files with 128 additions and 64 deletions

View File

@ -70,9 +70,10 @@ type ContainerManager interface {
// GetCapacity returns the amount of compute resources tracked by container manager available on the node.
GetCapacity() v1.ResourceList
// GetDevicePluginResourceCapacity returns the amount of device plugin resources available on the node
// GetDevicePluginResourceCapacity returns the node capacity (amount of total device plugin resources),
// node allocatable (amount of total healthy resources reported by device plugin),
// and inactive device plugin resources previously registered on the node.
GetDevicePluginResourceCapacity() (v1.ResourceList, []string)
GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string)
// UpdateQOSCgroups performs housekeeping updates to ensure that the top
// level QoS containers have their desired state in a thread-safe way

View File

@ -887,6 +887,6 @@ func (cm *containerManagerImpl) GetCapacity() v1.ResourceList {
return cm.capacity
}
func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceList, []string) {
func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) {
return cm.devicePluginManager.GetCapacity()
}

View File

@ -70,8 +70,8 @@ func (cm *containerManagerStub) GetCapacity() v1.ResourceList {
return nil
}
func (cm *containerManagerStub) GetDevicePluginResourceCapacity() (v1.ResourceList, []string) {
return nil, []string{}
func (cm *containerManagerStub) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) {
return nil, nil, []string{}
}
func (cm *containerManagerStub) NewPodContainerManager() PodContainerManager {

View File

@ -73,8 +73,11 @@ type ManagerImpl struct {
// e.g. a new device is advertised, two old devices are deleted and a running device fails.
callback monitorCallback
// allDevices contains all of registered resourceNames and their exported device IDs.
allDevices map[string]sets.String
// healthyDevices contains all of the registered healthy resourceNames and their exported device IDs.
healthyDevices map[string]sets.String
// unhealthyDevices contains all of the unhealthy devices and their exported device IDs.
unhealthyDevices map[string]sets.String
// allocatedDevices contains allocated deviceIds, keyed by resourceName.
allocatedDevices map[string]sets.String
@ -106,7 +109,8 @@ func newManagerImpl(socketPath string) (*ManagerImpl, error) {
endpoints: make(map[string]endpoint),
socketname: file,
socketdir: dir,
allDevices: make(map[string]sets.String),
healthyDevices: make(map[string]sets.String),
unhealthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
podDevices: make(podDevices),
}
@ -128,20 +132,24 @@ func newManagerImpl(socketPath string) (*ManagerImpl, error) {
func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, added, updated, deleted []pluginapi.Device) {
kept := append(updated, added...)
m.mutex.Lock()
if _, ok := m.allDevices[resourceName]; !ok {
m.allDevices[resourceName] = sets.NewString()
if _, ok := m.healthyDevices[resourceName]; !ok {
m.healthyDevices[resourceName] = sets.NewString()
}
if _, ok := m.unhealthyDevices[resourceName]; !ok {
m.unhealthyDevices[resourceName] = sets.NewString()
}
// For now, Manager only keeps track of healthy devices.
// TODO: adds support to track unhealthy devices.
for _, dev := range kept {
if dev.Health == pluginapi.Healthy {
m.allDevices[resourceName].Insert(dev.ID)
m.healthyDevices[resourceName].Insert(dev.ID)
m.unhealthyDevices[resourceName].Delete(dev.ID)
} else {
m.allDevices[resourceName].Delete(dev.ID)
m.unhealthyDevices[resourceName].Insert(dev.ID)
m.healthyDevices[resourceName].Delete(dev.ID)
}
}
for _, dev := range deleted {
m.allDevices[resourceName].Delete(dev.ID)
m.healthyDevices[resourceName].Delete(dev.ID)
m.unhealthyDevices[resourceName].Delete(dev.ID)
}
m.mutex.Unlock()
m.writeCheckpoint()
@ -371,7 +379,8 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
// GetCapacity is expected to be called when Kubelet updates its node status.
// The first returned variable contains the registered device plugin resource capacity.
// The second returned variable contains previously registered resources that are no longer active.
// The second returned variable contains the registered device plugin resource allocatable.
// The third returned variable contains previously registered resources that are no longer active.
// Kubelet uses this information to update resource capacity/allocatable in its node status.
// After the call, device plugin can remove the inactive resources from its internal list as the
// change is already reflected in Kubelet node status.
@ -380,25 +389,47 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
// cm.UpdatePluginResource() run during predicate Admit guarantees we adjust nodeinfo
// capacity for already allocated pods so that they can continue to run. However, new pods
// requiring device plugin resources will not be scheduled till device plugin re-registers.
func (m *ManagerImpl) GetCapacity() (v1.ResourceList, []string) {
func (m *ManagerImpl) GetCapacity() (v1.ResourceList, v1.ResourceList, []string) {
needsUpdateCheckpoint := false
var capacity = v1.ResourceList{}
var allocatable = v1.ResourceList{}
var deletedResources []string
m.mutex.Lock()
for resourceName, devices := range m.allDevices {
for resourceName, devices := range m.healthyDevices {
if _, ok := m.endpoints[resourceName]; !ok {
delete(m.allDevices, resourceName)
delete(m.healthyDevices, resourceName)
deletedResources = append(deletedResources, resourceName)
needsUpdateCheckpoint = true
} else {
capacity[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)
allocatable[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)
}
}
for resourceName, devices := range m.unhealthyDevices {
if _, ok := m.endpoints[resourceName]; !ok {
delete(m.unhealthyDevices, resourceName)
alreadyDeleted := false
for _, name := range deletedResources {
if name == resourceName {
alreadyDeleted = true
}
}
if !alreadyDeleted {
deletedResources = append(deletedResources, resourceName)
}
needsUpdateCheckpoint = true
} else {
capacityCount := capacity[v1.ResourceName(resourceName)]
unhealthyCount := *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)
capacityCount.Add(unhealthyCount)
capacity[v1.ResourceName(resourceName)] = capacityCount
}
}
m.mutex.Unlock()
if needsUpdateCheckpoint {
m.writeCheckpoint()
}
return capacity, deletedResources
return capacity, allocatable, deletedResources
}
// checkpointData struct is used to store pod to device allocation information
@ -416,7 +447,7 @@ func (m *ManagerImpl) writeCheckpoint() error {
PodDeviceEntries: m.podDevices.toCheckpointData(),
RegisteredDevices: make(map[string][]string),
}
for resource, devices := range m.allDevices {
for resource, devices := range m.healthyDevices {
data.RegisteredDevices[resource] = devices.UnsortedList()
}
m.mutex.Unlock()
@ -453,9 +484,10 @@ func (m *ManagerImpl) readCheckpoint() error {
m.podDevices.fromCheckpointData(data.PodDeviceEntries)
m.allocatedDevices = m.podDevices.devices()
for resource, devices := range data.RegisteredDevices {
m.allDevices[resource] = sets.NewString()
// TODO: Support Checkpointing for unhealthy devices as well
m.healthyDevices[resource] = sets.NewString()
for _, dev := range devices {
m.allDevices[resource].Insert(dev)
m.healthyDevices[resource].Insert(dev)
}
}
return nil
@ -508,7 +540,7 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi
}
glog.V(3).Infof("Needs to allocate %v %v for pod %q container %q", needed, resource, podUID, contName)
// Needs to allocate additional devices.
if _, ok := m.allDevices[resource]; !ok {
if _, ok := m.healthyDevices[resource]; !ok {
return nil, fmt.Errorf("can't allocate unregistered device %v", resource)
}
devices = sets.NewString()
@ -527,7 +559,7 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi
// Gets Devices in use.
devicesInUse := m.allocatedDevices[resource]
// Gets a list of available devices.
available := m.allDevices[resource].Difference(devicesInUse)
available := m.healthyDevices[resource].Difference(devicesInUse)
if int(available.Len()) < needed {
return nil, fmt.Errorf("requested number of devices unavailable for %s. Requested: %d, Available: %d", resource, needed, available.Len())
}
@ -557,7 +589,7 @@ func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Cont
resource := string(k)
needed := int(v.Value())
glog.V(3).Infof("needs %d %s", needed, resource)
_, registeredResource := m.allDevices[resource]
_, registeredResource := m.healthyDevices[resource]
_, allocatedResource := m.allocatedDevices[resource]
// Continues if this is neither an active device plugin resource nor
// a resource we have previously allocated.

View File

@ -58,6 +58,6 @@ func (h *ManagerStub) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Co
}
// GetCapacity simply returns nil capacity and empty removed resource list.
func (h *ManagerStub) GetCapacity() (v1.ResourceList, []string) {
return nil, []string{}
func (h *ManagerStub) GetCapacity() (v1.ResourceList, v1.ResourceList, []string) {
return nil, nil, []string{}
}

View File

@ -149,7 +149,7 @@ func cleanup(t *testing.T, m Manager, p *Stub) {
m.Stop()
}
func TestUpdateCapacity(t *testing.T) {
func TestUpdateCapacityAllocatable(t *testing.T) {
testManager, err := newManagerImpl(socketName)
as := assert.New(t)
as.NotNil(testManager)
@ -167,61 +167,81 @@ func TestUpdateCapacity(t *testing.T) {
resourceName1 := "domain1.com/resource1"
testManager.endpoints[resourceName1] = &endpointImpl{devices: make(map[string]pluginapi.Device)}
callback(resourceName1, devs, []pluginapi.Device{}, []pluginapi.Device{})
capacity, removedResources := testManager.GetCapacity()
capacity, allocatable, removedResources := testManager.GetCapacity()
resource1Capacity, ok := capacity[v1.ResourceName(resourceName1)]
as.True(ok)
as.Equal(int64(2), resource1Capacity.Value())
resource1Allocatable, ok := allocatable[v1.ResourceName(resourceName1)]
as.True(ok)
as.Equal(int64(3), resource1Capacity.Value())
as.Equal(int64(2), resource1Allocatable.Value())
as.Equal(0, len(removedResources))
// Deletes an unhealthy device should NOT change capacity.
// Deletes an unhealthy device should NOT change allocatable but change capacity.
callback(resourceName1, []pluginapi.Device{}, []pluginapi.Device{}, []pluginapi.Device{devs[2]})
capacity, removedResources = testManager.GetCapacity()
capacity, allocatable, removedResources = testManager.GetCapacity()
resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)]
as.True(ok)
resource1Allocatable, ok = allocatable[v1.ResourceName(resourceName1)]
as.True(ok)
as.Equal(int64(2), resource1Capacity.Value())
as.Equal(int64(2), resource1Allocatable.Value())
as.Equal(0, len(removedResources))
// Updates a healthy device to unhealthy should reduce capacity by 1.
// Updates a healthy device to unhealthy should reduce allocatable by 1.
dev2 := devs[1]
dev2.Health = pluginapi.Unhealthy
callback(resourceName1, []pluginapi.Device{}, []pluginapi.Device{dev2}, []pluginapi.Device{})
capacity, removedResources = testManager.GetCapacity()
capacity, allocatable, removedResources = testManager.GetCapacity()
resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)]
as.True(ok)
as.Equal(int64(1), resource1Capacity.Value())
resource1Allocatable, ok = allocatable[v1.ResourceName(resourceName1)]
as.True(ok)
as.Equal(int64(2), resource1Capacity.Value())
as.Equal(int64(1), resource1Allocatable.Value())
as.Equal(0, len(removedResources))
// Deletes a healthy device should reduce capacity by 1.
// Deletes a healthy device should reduce capacity and allocatable by 1.
callback(resourceName1, []pluginapi.Device{}, []pluginapi.Device{}, []pluginapi.Device{devs[0]})
capacity, removedResources = testManager.GetCapacity()
capacity, allocatable, removedResources = testManager.GetCapacity()
resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)]
as.True(ok)
as.Equal(int64(0), resource1Capacity.Value())
resource1Allocatable, ok = allocatable[v1.ResourceName(resourceName1)]
as.True(ok)
as.Equal(int64(0), resource1Allocatable.Value())
as.Equal(int64(1), resource1Capacity.Value())
as.Equal(0, len(removedResources))
// Tests adding another resource.
resourceName2 := "resource2"
testManager.endpoints[resourceName2] = &endpointImpl{devices: make(map[string]pluginapi.Device)}
callback(resourceName2, devs, []pluginapi.Device{}, []pluginapi.Device{})
capacity, removedResources = testManager.GetCapacity()
capacity, allocatable, removedResources = testManager.GetCapacity()
as.Equal(2, len(capacity))
resource2Capacity, ok := capacity[v1.ResourceName(resourceName2)]
as.True(ok)
as.Equal(int64(2), resource2Capacity.Value())
resource2Allocatable, ok := allocatable[v1.ResourceName(resourceName2)]
as.True(ok)
as.Equal(int64(3), resource2Capacity.Value())
as.Equal(int64(2), resource2Allocatable.Value())
as.Equal(0, len(removedResources))
// Removes resourceName1 endpoint. Verifies testManager.GetCapacity() reports that resourceName1
// is removed from capacity and it no longer exists in allDevices after the call.
// is removed from capacity and it no longer exists in healthyDevices after the call.
delete(testManager.endpoints, resourceName1)
capacity, removed := testManager.GetCapacity()
capacity, allocatable, removed := testManager.GetCapacity()
as.Equal([]string{resourceName1}, removed)
_, ok = capacity[v1.ResourceName(resourceName1)]
as.False(ok)
val, ok := capacity[v1.ResourceName(resourceName2)]
as.True(ok)
as.Equal(int64(2), val.Value())
_, ok = testManager.allDevices[resourceName1]
as.Equal(int64(3), val.Value())
_, ok = testManager.healthyDevices[resourceName1]
as.False(ok)
_, ok = testManager.unhealthyDevices[resourceName1]
as.False(ok)
fmt.Println("removed: ", removed)
as.Equal(1, len(removed))
}
type stringPairType struct {
@ -270,7 +290,7 @@ func TestCheckpoint(t *testing.T) {
defer os.RemoveAll(tmpDir)
testManager := &ManagerImpl{
socketdir: tmpDir,
allDevices: make(map[string]sets.String),
healthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
podDevices: make(podDevices),
}
@ -294,19 +314,19 @@ func TestCheckpoint(t *testing.T) {
constructAllocResp(map[string]string{"/dev/r1dev4": "/dev/r1dev4"},
map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{}))
testManager.allDevices[resourceName1] = sets.NewString()
testManager.allDevices[resourceName1].Insert("dev1")
testManager.allDevices[resourceName1].Insert("dev2")
testManager.allDevices[resourceName1].Insert("dev3")
testManager.allDevices[resourceName1].Insert("dev4")
testManager.allDevices[resourceName1].Insert("dev5")
testManager.allDevices[resourceName2] = sets.NewString()
testManager.allDevices[resourceName2].Insert("dev1")
testManager.allDevices[resourceName2].Insert("dev2")
testManager.healthyDevices[resourceName1] = sets.NewString()
testManager.healthyDevices[resourceName1].Insert("dev1")
testManager.healthyDevices[resourceName1].Insert("dev2")
testManager.healthyDevices[resourceName1].Insert("dev3")
testManager.healthyDevices[resourceName1].Insert("dev4")
testManager.healthyDevices[resourceName1].Insert("dev5")
testManager.healthyDevices[resourceName2] = sets.NewString()
testManager.healthyDevices[resourceName2].Insert("dev1")
testManager.healthyDevices[resourceName2].Insert("dev2")
expectedPodDevices := testManager.podDevices
expectedAllocatedDevices := testManager.podDevices.devices()
expectedAllDevices := testManager.allDevices
expectedAllDevices := testManager.healthyDevices
err = testManager.writeCheckpoint()
@ -331,7 +351,7 @@ func TestCheckpoint(t *testing.T) {
}
}
as.True(reflect.DeepEqual(expectedAllocatedDevices, testManager.allocatedDevices))
as.True(reflect.DeepEqual(expectedAllDevices, testManager.allDevices))
as.True(reflect.DeepEqual(expectedAllDevices, testManager.healthyDevices))
}
type activePodsStub struct {
@ -388,7 +408,7 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso
testManager := &ManagerImpl{
socketdir: tmpDir,
callback: monitorCallback,
allDevices: make(map[string]sets.String),
healthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
endpoints: make(map[string]endpoint),
podDevices: make(podDevices),
@ -397,9 +417,9 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso
}
testManager.store, _ = utilstore.NewFileStore("/tmp/", utilfs.DefaultFs{})
for _, res := range testRes {
testManager.allDevices[res.resourceName] = sets.NewString()
testManager.healthyDevices[res.resourceName] = sets.NewString()
for _, dev := range res.devs {
testManager.allDevices[res.resourceName].Insert(dev)
testManager.healthyDevices[res.resourceName].Insert(dev)
}
if res.resourceName == "domain1.com/resource1" {
testManager.endpoints[res.resourceName] = &MockEndpoint{
@ -682,7 +702,7 @@ func TestSanitizeNodeAllocatable(t *testing.T) {
testManager := &ManagerImpl{
callback: monitorCallback,
allDevices: make(map[string]sets.String),
healthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
podDevices: make(podDevices),
}

View File

@ -53,9 +53,9 @@ type Manager interface {
// for the found one. An empty struct is returned in case no cached state is found.
GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions
// GetCapacity returns the amount of available device plugin resource capacity
// GetCapacity returns the amount of available device plugin resource capacity, resource allocatable
// and inactive device plugin resources previously registered on the node.
GetCapacity() (v1.ResourceList, []string)
GetCapacity() (v1.ResourceList, v1.ResourceList, []string)
}
// DeviceRunContainerOptions contains the combined container runtime settings to consume its allocated devices.

View File

@ -550,6 +550,10 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) {
}
}
var devicePluginAllocatable v1.ResourceList
var devicePluginCapacity v1.ResourceList
var removedDevicePlugins []string
// TODO: Post NotReady if we cannot get MachineInfo from cAdvisor. This needs to start
// cAdvisor locally, e.g. for test-cmd.sh, and in integration test.
info, err := kl.GetCachedMachineInfo()
@ -594,13 +598,14 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) {
}
}
devicePluginCapacity, removedDevicePlugins := kl.containerManager.GetDevicePluginResourceCapacity()
devicePluginCapacity, devicePluginAllocatable, removedDevicePlugins = kl.containerManager.GetDevicePluginResourceCapacity()
if devicePluginCapacity != nil {
for k, v := range devicePluginCapacity {
glog.V(2).Infof("Update capacity for %s to %d", k, v.Value())
node.Status.Capacity[k] = v
}
}
for _, removedResource := range removedDevicePlugins {
glog.V(2).Infof("Remove capacity for %s", removedResource)
delete(node.Status.Capacity, v1.ResourceName(removedResource))
@ -631,6 +636,12 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) {
}
node.Status.Allocatable[k] = value
}
if devicePluginAllocatable != nil {
for k, v := range devicePluginAllocatable {
glog.V(2).Infof("Update allocatable for %s to %d", k, v.Value())
node.Status.Allocatable[k] = v
}
}
// for every huge page reservation, we need to remove it from allocatable memory
for k, v := range node.Status.Capacity {
if v1helper.IsHugePageResourceName(k) {