diff --git a/pkg/kubelet/cm/container_manager.go b/pkg/kubelet/cm/container_manager.go index fd61f2a751b..5dbe6d9b1b5 100644 --- a/pkg/kubelet/cm/container_manager.go +++ b/pkg/kubelet/cm/container_manager.go @@ -70,9 +70,10 @@ type ContainerManager interface { // GetCapacity returns the amount of compute resources tracked by container manager available on the node. GetCapacity() v1.ResourceList - // GetDevicePluginResourceCapacity returns the amount of device plugin resources available on the node + // GetDevicePluginResourceCapacity returns the node capacity (amount of total device plugin resources), + // node allocatable (amount of total healthy resources reported by device plugin), // and inactive device plugin resources previously registered on the node. - GetDevicePluginResourceCapacity() (v1.ResourceList, []string) + GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) // UpdateQOSCgroups performs housekeeping updates to ensure that the top // level QoS containers have their desired state in a thread-safe way diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index f72ec699c96..51d8a4493b8 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -887,6 +887,6 @@ func (cm *containerManagerImpl) GetCapacity() v1.ResourceList { return cm.capacity } -func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceList, []string) { +func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) { return cm.devicePluginManager.GetCapacity() } diff --git a/pkg/kubelet/cm/container_manager_stub.go b/pkg/kubelet/cm/container_manager_stub.go index 83df4cfaad8..a00dc4e9fbf 100644 --- a/pkg/kubelet/cm/container_manager_stub.go +++ b/pkg/kubelet/cm/container_manager_stub.go @@ -70,8 +70,8 @@ func (cm *containerManagerStub) GetCapacity() v1.ResourceList { return nil } -func (cm *containerManagerStub) GetDevicePluginResourceCapacity() (v1.ResourceList, []string) { - return nil, []string{} +func (cm *containerManagerStub) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) { + return nil, nil, []string{} } func (cm *containerManagerStub) NewPodContainerManager() PodContainerManager { diff --git a/pkg/kubelet/cm/deviceplugin/manager.go b/pkg/kubelet/cm/deviceplugin/manager.go index 646dd658793..e99590dffd6 100644 --- a/pkg/kubelet/cm/deviceplugin/manager.go +++ b/pkg/kubelet/cm/deviceplugin/manager.go @@ -73,8 +73,11 @@ type ManagerImpl struct { // e.g. a new device is advertised, two old devices are deleted and a running device fails. callback monitorCallback - // allDevices contains all of registered resourceNames and their exported device IDs. - allDevices map[string]sets.String + // healthyDevices contains all of the registered healthy resourceNames and their exported device IDs. + healthyDevices map[string]sets.String + + // unhealthyDevices contains all of the unhealthy devices and their exported device IDs. + unhealthyDevices map[string]sets.String // allocatedDevices contains allocated deviceIds, keyed by resourceName. allocatedDevices map[string]sets.String @@ -106,7 +109,8 @@ func newManagerImpl(socketPath string) (*ManagerImpl, error) { endpoints: make(map[string]endpoint), socketname: file, socketdir: dir, - allDevices: make(map[string]sets.String), + healthyDevices: make(map[string]sets.String), + unhealthyDevices: make(map[string]sets.String), allocatedDevices: make(map[string]sets.String), podDevices: make(podDevices), } @@ -128,20 +132,24 @@ func newManagerImpl(socketPath string) (*ManagerImpl, error) { func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, added, updated, deleted []pluginapi.Device) { kept := append(updated, added...) m.mutex.Lock() - if _, ok := m.allDevices[resourceName]; !ok { - m.allDevices[resourceName] = sets.NewString() + if _, ok := m.healthyDevices[resourceName]; !ok { + m.healthyDevices[resourceName] = sets.NewString() + } + if _, ok := m.unhealthyDevices[resourceName]; !ok { + m.unhealthyDevices[resourceName] = sets.NewString() } - // For now, Manager only keeps track of healthy devices. - // TODO: adds support to track unhealthy devices. for _, dev := range kept { if dev.Health == pluginapi.Healthy { - m.allDevices[resourceName].Insert(dev.ID) + m.healthyDevices[resourceName].Insert(dev.ID) + m.unhealthyDevices[resourceName].Delete(dev.ID) } else { - m.allDevices[resourceName].Delete(dev.ID) + m.unhealthyDevices[resourceName].Insert(dev.ID) + m.healthyDevices[resourceName].Delete(dev.ID) } } for _, dev := range deleted { - m.allDevices[resourceName].Delete(dev.ID) + m.healthyDevices[resourceName].Delete(dev.ID) + m.unhealthyDevices[resourceName].Delete(dev.ID) } m.mutex.Unlock() m.writeCheckpoint() @@ -371,7 +379,8 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) { // GetCapacity is expected to be called when Kubelet updates its node status. // The first returned variable contains the registered device plugin resource capacity. -// The second returned variable contains previously registered resources that are no longer active. +// The second returned variable contains the registered device plugin resource allocatable. +// The third returned variable contains previously registered resources that are no longer active. // Kubelet uses this information to update resource capacity/allocatable in its node status. // After the call, device plugin can remove the inactive resources from its internal list as the // change is already reflected in Kubelet node status. @@ -380,25 +389,47 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) { // cm.UpdatePluginResource() run during predicate Admit guarantees we adjust nodeinfo // capacity for already allocated pods so that they can continue to run. However, new pods // requiring device plugin resources will not be scheduled till device plugin re-registers. -func (m *ManagerImpl) GetCapacity() (v1.ResourceList, []string) { +func (m *ManagerImpl) GetCapacity() (v1.ResourceList, v1.ResourceList, []string) { needsUpdateCheckpoint := false var capacity = v1.ResourceList{} + var allocatable = v1.ResourceList{} var deletedResources []string m.mutex.Lock() - for resourceName, devices := range m.allDevices { + for resourceName, devices := range m.healthyDevices { if _, ok := m.endpoints[resourceName]; !ok { - delete(m.allDevices, resourceName) + delete(m.healthyDevices, resourceName) deletedResources = append(deletedResources, resourceName) needsUpdateCheckpoint = true } else { capacity[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI) + allocatable[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI) + } + } + for resourceName, devices := range m.unhealthyDevices { + if _, ok := m.endpoints[resourceName]; !ok { + delete(m.unhealthyDevices, resourceName) + alreadyDeleted := false + for _, name := range deletedResources { + if name == resourceName { + alreadyDeleted = true + } + } + if !alreadyDeleted { + deletedResources = append(deletedResources, resourceName) + } + needsUpdateCheckpoint = true + } else { + capacityCount := capacity[v1.ResourceName(resourceName)] + unhealthyCount := *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI) + capacityCount.Add(unhealthyCount) + capacity[v1.ResourceName(resourceName)] = capacityCount } } m.mutex.Unlock() if needsUpdateCheckpoint { m.writeCheckpoint() } - return capacity, deletedResources + return capacity, allocatable, deletedResources } // checkpointData struct is used to store pod to device allocation information @@ -416,7 +447,7 @@ func (m *ManagerImpl) writeCheckpoint() error { PodDeviceEntries: m.podDevices.toCheckpointData(), RegisteredDevices: make(map[string][]string), } - for resource, devices := range m.allDevices { + for resource, devices := range m.healthyDevices { data.RegisteredDevices[resource] = devices.UnsortedList() } m.mutex.Unlock() @@ -453,9 +484,10 @@ func (m *ManagerImpl) readCheckpoint() error { m.podDevices.fromCheckpointData(data.PodDeviceEntries) m.allocatedDevices = m.podDevices.devices() for resource, devices := range data.RegisteredDevices { - m.allDevices[resource] = sets.NewString() + // TODO: Support Checkpointing for unhealthy devices as well + m.healthyDevices[resource] = sets.NewString() for _, dev := range devices { - m.allDevices[resource].Insert(dev) + m.healthyDevices[resource].Insert(dev) } } return nil @@ -508,7 +540,7 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi } glog.V(3).Infof("Needs to allocate %v %v for pod %q container %q", needed, resource, podUID, contName) // Needs to allocate additional devices. - if _, ok := m.allDevices[resource]; !ok { + if _, ok := m.healthyDevices[resource]; !ok { return nil, fmt.Errorf("can't allocate unregistered device %v", resource) } devices = sets.NewString() @@ -527,7 +559,7 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi // Gets Devices in use. devicesInUse := m.allocatedDevices[resource] // Gets a list of available devices. - available := m.allDevices[resource].Difference(devicesInUse) + available := m.healthyDevices[resource].Difference(devicesInUse) if int(available.Len()) < needed { return nil, fmt.Errorf("requested number of devices unavailable for %s. Requested: %d, Available: %d", resource, needed, available.Len()) } @@ -558,7 +590,7 @@ func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Cont resource := string(k) needed := int(v.Value()) glog.V(3).Infof("needs %d %s", needed, resource) - _, registeredResource := m.allDevices[resource] + _, registeredResource := m.healthyDevices[resource] _, allocatedResource := m.allocatedDevices[resource] // Continues if this is neither an active device plugin resource nor // a resource we have previously allocated. diff --git a/pkg/kubelet/cm/deviceplugin/manager_stub.go b/pkg/kubelet/cm/deviceplugin/manager_stub.go index 5d7a4b74c5d..9fc77d1b136 100644 --- a/pkg/kubelet/cm/deviceplugin/manager_stub.go +++ b/pkg/kubelet/cm/deviceplugin/manager_stub.go @@ -58,6 +58,6 @@ func (h *ManagerStub) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Co } // GetCapacity simply returns nil capacity and empty removed resource list. -func (h *ManagerStub) GetCapacity() (v1.ResourceList, []string) { - return nil, []string{} +func (h *ManagerStub) GetCapacity() (v1.ResourceList, v1.ResourceList, []string) { + return nil, nil, []string{} } diff --git a/pkg/kubelet/cm/deviceplugin/manager_test.go b/pkg/kubelet/cm/deviceplugin/manager_test.go index 88147077c3a..124f690acf7 100644 --- a/pkg/kubelet/cm/deviceplugin/manager_test.go +++ b/pkg/kubelet/cm/deviceplugin/manager_test.go @@ -138,7 +138,7 @@ func cleanup(t *testing.T, m Manager, p *Stub) { m.Stop() } -func TestUpdateCapacity(t *testing.T) { +func TestUpdateCapacityAllocatable(t *testing.T) { testManager, err := newManagerImpl(socketName) as := assert.New(t) as.NotNil(testManager) @@ -156,61 +156,81 @@ func TestUpdateCapacity(t *testing.T) { resourceName1 := "domain1.com/resource1" testManager.endpoints[resourceName1] = &endpointImpl{devices: make(map[string]pluginapi.Device)} callback(resourceName1, devs, []pluginapi.Device{}, []pluginapi.Device{}) - capacity, removedResources := testManager.GetCapacity() + capacity, allocatable, removedResources := testManager.GetCapacity() resource1Capacity, ok := capacity[v1.ResourceName(resourceName1)] as.True(ok) - as.Equal(int64(2), resource1Capacity.Value()) + resource1Allocatable, ok := allocatable[v1.ResourceName(resourceName1)] + as.True(ok) + as.Equal(int64(3), resource1Capacity.Value()) + as.Equal(int64(2), resource1Allocatable.Value()) as.Equal(0, len(removedResources)) - // Deletes an unhealthy device should NOT change capacity. + // Deletes an unhealthy device should NOT change allocatable but change capacity. callback(resourceName1, []pluginapi.Device{}, []pluginapi.Device{}, []pluginapi.Device{devs[2]}) - capacity, removedResources = testManager.GetCapacity() + capacity, allocatable, removedResources = testManager.GetCapacity() resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)] as.True(ok) + resource1Allocatable, ok = allocatable[v1.ResourceName(resourceName1)] + as.True(ok) as.Equal(int64(2), resource1Capacity.Value()) + as.Equal(int64(2), resource1Allocatable.Value()) as.Equal(0, len(removedResources)) - // Updates a healthy device to unhealthy should reduce capacity by 1. + // Updates a healthy device to unhealthy should reduce allocatable by 1. dev2 := devs[1] dev2.Health = pluginapi.Unhealthy callback(resourceName1, []pluginapi.Device{}, []pluginapi.Device{dev2}, []pluginapi.Device{}) - capacity, removedResources = testManager.GetCapacity() + capacity, allocatable, removedResources = testManager.GetCapacity() resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)] as.True(ok) - as.Equal(int64(1), resource1Capacity.Value()) + resource1Allocatable, ok = allocatable[v1.ResourceName(resourceName1)] + as.True(ok) + as.Equal(int64(2), resource1Capacity.Value()) + as.Equal(int64(1), resource1Allocatable.Value()) as.Equal(0, len(removedResources)) - // Deletes a healthy device should reduce capacity by 1. + // Deletes a healthy device should reduce capacity and allocatable by 1. callback(resourceName1, []pluginapi.Device{}, []pluginapi.Device{}, []pluginapi.Device{devs[0]}) - capacity, removedResources = testManager.GetCapacity() + capacity, allocatable, removedResources = testManager.GetCapacity() resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)] as.True(ok) - as.Equal(int64(0), resource1Capacity.Value()) + resource1Allocatable, ok = allocatable[v1.ResourceName(resourceName1)] + as.True(ok) + as.Equal(int64(0), resource1Allocatable.Value()) + as.Equal(int64(1), resource1Capacity.Value()) as.Equal(0, len(removedResources)) // Tests adding another resource. resourceName2 := "resource2" testManager.endpoints[resourceName2] = &endpointImpl{devices: make(map[string]pluginapi.Device)} callback(resourceName2, devs, []pluginapi.Device{}, []pluginapi.Device{}) - capacity, removedResources = testManager.GetCapacity() + capacity, allocatable, removedResources = testManager.GetCapacity() as.Equal(2, len(capacity)) resource2Capacity, ok := capacity[v1.ResourceName(resourceName2)] as.True(ok) - as.Equal(int64(2), resource2Capacity.Value()) + resource2Allocatable, ok := allocatable[v1.ResourceName(resourceName2)] + as.True(ok) + as.Equal(int64(3), resource2Capacity.Value()) + as.Equal(int64(2), resource2Allocatable.Value()) as.Equal(0, len(removedResources)) // Removes resourceName1 endpoint. Verifies testManager.GetCapacity() reports that resourceName1 - // is removed from capacity and it no longer exists in allDevices after the call. + // is removed from capacity and it no longer exists in healthyDevices after the call. delete(testManager.endpoints, resourceName1) - capacity, removed := testManager.GetCapacity() + capacity, allocatable, removed := testManager.GetCapacity() as.Equal([]string{resourceName1}, removed) _, ok = capacity[v1.ResourceName(resourceName1)] as.False(ok) val, ok := capacity[v1.ResourceName(resourceName2)] as.True(ok) - as.Equal(int64(2), val.Value()) - _, ok = testManager.allDevices[resourceName1] + as.Equal(int64(3), val.Value()) + _, ok = testManager.healthyDevices[resourceName1] as.False(ok) + _, ok = testManager.unhealthyDevices[resourceName1] + as.False(ok) + fmt.Println("removed: ", removed) + as.Equal(1, len(removed)) + } type stringPairType struct { @@ -259,7 +279,7 @@ func TestCheckpoint(t *testing.T) { defer os.RemoveAll(tmpDir) testManager := &ManagerImpl{ socketdir: tmpDir, - allDevices: make(map[string]sets.String), + healthyDevices: make(map[string]sets.String), allocatedDevices: make(map[string]sets.String), podDevices: make(podDevices), } @@ -283,19 +303,19 @@ func TestCheckpoint(t *testing.T) { constructAllocResp(map[string]string{"/dev/r1dev4": "/dev/r1dev4"}, map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{})) - testManager.allDevices[resourceName1] = sets.NewString() - testManager.allDevices[resourceName1].Insert("dev1") - testManager.allDevices[resourceName1].Insert("dev2") - testManager.allDevices[resourceName1].Insert("dev3") - testManager.allDevices[resourceName1].Insert("dev4") - testManager.allDevices[resourceName1].Insert("dev5") - testManager.allDevices[resourceName2] = sets.NewString() - testManager.allDevices[resourceName2].Insert("dev1") - testManager.allDevices[resourceName2].Insert("dev2") + testManager.healthyDevices[resourceName1] = sets.NewString() + testManager.healthyDevices[resourceName1].Insert("dev1") + testManager.healthyDevices[resourceName1].Insert("dev2") + testManager.healthyDevices[resourceName1].Insert("dev3") + testManager.healthyDevices[resourceName1].Insert("dev4") + testManager.healthyDevices[resourceName1].Insert("dev5") + testManager.healthyDevices[resourceName2] = sets.NewString() + testManager.healthyDevices[resourceName2].Insert("dev1") + testManager.healthyDevices[resourceName2].Insert("dev2") expectedPodDevices := testManager.podDevices expectedAllocatedDevices := testManager.podDevices.devices() - expectedAllDevices := testManager.allDevices + expectedAllDevices := testManager.healthyDevices err = testManager.writeCheckpoint() @@ -320,7 +340,7 @@ func TestCheckpoint(t *testing.T) { } } as.True(reflect.DeepEqual(expectedAllocatedDevices, testManager.allocatedDevices)) - as.True(reflect.DeepEqual(expectedAllDevices, testManager.allDevices)) + as.True(reflect.DeepEqual(expectedAllDevices, testManager.healthyDevices)) } type activePodsStub struct { @@ -377,7 +397,7 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso testManager := &ManagerImpl{ socketdir: tmpDir, callback: monitorCallback, - allDevices: make(map[string]sets.String), + healthyDevices: make(map[string]sets.String), allocatedDevices: make(map[string]sets.String), endpoints: make(map[string]endpoint), podDevices: make(podDevices), @@ -386,9 +406,9 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso } testManager.store, _ = utilstore.NewFileStore("/tmp/", utilfs.DefaultFs{}) for _, res := range testRes { - testManager.allDevices[res.resourceName] = sets.NewString() + testManager.healthyDevices[res.resourceName] = sets.NewString() for _, dev := range res.devs { - testManager.allDevices[res.resourceName].Insert(dev) + testManager.healthyDevices[res.resourceName].Insert(dev) } if res.resourceName == "domain1.com/resource1" { testManager.endpoints[res.resourceName] = &MockEndpoint{ @@ -675,7 +695,7 @@ func TestSanitizeNodeAllocatable(t *testing.T) { testManager := &ManagerImpl{ callback: monitorCallback, - allDevices: make(map[string]sets.String), + healthyDevices: make(map[string]sets.String), allocatedDevices: make(map[string]sets.String), podDevices: make(podDevices), } diff --git a/pkg/kubelet/cm/deviceplugin/types.go b/pkg/kubelet/cm/deviceplugin/types.go index 3c6b30206f1..4dc12a5d8dd 100644 --- a/pkg/kubelet/cm/deviceplugin/types.go +++ b/pkg/kubelet/cm/deviceplugin/types.go @@ -53,9 +53,9 @@ type Manager interface { // for the found one. An empty struct is returned in case no cached state is found. GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions - // GetCapacity returns the amount of available device plugin resource capacity + // GetCapacity returns the amount of available device plugin resource capacity, resource allocatable // and inactive device plugin resources previously registered on the node. - GetCapacity() (v1.ResourceList, []string) + GetCapacity() (v1.ResourceList, v1.ResourceList, []string) } // DeviceRunContainerOptions contains the combined container runtime settings to consume its allocated devices. diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index 7094b66f996..f934d468200 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -548,6 +548,10 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) { } } + var devicePluginAllocatable v1.ResourceList + var devicePluginCapacity v1.ResourceList + var removedDevicePlugins []string + // TODO: Post NotReady if we cannot get MachineInfo from cAdvisor. This needs to start // cAdvisor locally, e.g. for test-cmd.sh, and in integration test. info, err := kl.GetCachedMachineInfo() @@ -592,13 +596,14 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) { } } - devicePluginCapacity, removedDevicePlugins := kl.containerManager.GetDevicePluginResourceCapacity() + devicePluginCapacity, devicePluginAllocatable, removedDevicePlugins = kl.containerManager.GetDevicePluginResourceCapacity() if devicePluginCapacity != nil { for k, v := range devicePluginCapacity { glog.V(2).Infof("Update capacity for %s to %d", k, v.Value()) node.Status.Capacity[k] = v } } + for _, removedResource := range removedDevicePlugins { glog.V(2).Infof("Remove capacity for %s", removedResource) delete(node.Status.Capacity, v1.ResourceName(removedResource)) @@ -629,6 +634,12 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) { } node.Status.Allocatable[k] = value } + if devicePluginAllocatable != nil { + for k, v := range devicePluginAllocatable { + glog.V(2).Infof("Update allocatable for %s to %d", k, v.Value()) + node.Status.Allocatable[k] = v + } + } // for every huge page reservation, we need to remove it from allocatable memory for k, v := range node.Status.Capacity { if v1helper.IsHugePageResourceName(k) {