Extends deviceplugin to gracefully handle full device plugin lifecycle.

- Instead of using cm.capacity field to communicate device plugin resource
capacity, this PR changes to use an explicit cm.GetDevicePluginResourceCapacity()
function that returns device plugin resource capacity as well as any inactive
device plugin resource. Kubelet syncNodeStatus call this function during its
periodic run to update node status capacity and allocatable. After this call,
device plugin can remove the inactive device plugin resource from its allDevices
field as the update is already pushed to API server.
- Extends device plugin checkpoint data to record registered resources
so that we can finish resource removing even upon kubelet restarts.
- Passes sourcesReady from kubelet to device plugin to avoid removing
inactive pods during grace period of kubelet restart.
This commit is contained in:
Jiaying Zhang 2017-10-23 16:18:49 -07:00
parent 9fe2a62b90
commit 1eb4e79453
14 changed files with 245 additions and 97 deletions

View File

@ -34,6 +34,7 @@ go_library(
"//pkg/kubelet/apis/cri:go_default_library",
"//pkg/kubelet/cadvisor:go_default_library",
"//pkg/kubelet/cm/cpumanager:go_default_library",
"//pkg/kubelet/config:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/eviction/api:go_default_library",
"//pkg/kubelet/lifecycle:go_default_library",

View File

@ -23,6 +23,7 @@ import (
// TODO: Migrate kubelet to either use its own internal objects or client library.
"k8s.io/api/core/v1"
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
@ -41,7 +42,7 @@ type ContainerManager interface {
// Runs the container manager's housekeeping.
// - Ensures that the Docker daemon is in a container.
// - Creates the system container where all non-containerized processes run.
Start(*v1.Node, ActivePodsFunc, status.PodStatusProvider, internalapi.RuntimeService) error
Start(*v1.Node, ActivePodsFunc, config.SourcesReady, status.PodStatusProvider, internalapi.RuntimeService) error
// SystemCgroupsLimit returns resources allocated to system cgroups in the machine.
// These cgroups include the system and Kubernetes services.
@ -69,6 +70,10 @@ type ContainerManager interface {
// GetCapacity returns the amount of compute resources tracked by container manager available on the node.
GetCapacity() v1.ResourceList
// GetDevicePluginResourceCapacity returns the amount of device plugin resources available on the node
// and inactive device plugin resources previously registered on the node.
GetDevicePluginResourceCapacity() (v1.ResourceList, []string)
// UpdateQOSCgroups performs housekeeping updates to ensure that the top
// level QoS containers have their desired state in a thread-safe way
UpdateQOSCgroups() error

View File

@ -47,6 +47,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/deviceplugin"
cmutil "k8s.io/kubernetes/pkg/kubelet/cm/util"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/qos"
@ -262,21 +263,9 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
qosContainerManager: qosContainerManager,
}
updateDeviceCapacityFunc := func(updates v1.ResourceList) {
cm.Lock()
defer cm.Unlock()
for k, v := range updates {
if v.Value() <= 0 {
delete(cm.capacity, k)
} else {
cm.capacity[k] = v
}
}
}
glog.Infof("Creating device plugin manager: %t", devicePluginEnabled)
if devicePluginEnabled {
cm.devicePluginManager, err = deviceplugin.NewManagerImpl(updateDeviceCapacityFunc)
cm.devicePluginManager, err = deviceplugin.NewManagerImpl()
} else {
cm.devicePluginManager, err = deviceplugin.NewManagerStub()
}
@ -530,6 +519,7 @@ func (cm *containerManagerImpl) Status() Status {
func (cm *containerManagerImpl) Start(node *v1.Node,
activePods ActivePodsFunc,
sourcesReady config.SourcesReady,
podStatusProvider status.PodStatusProvider,
runtimeService internalapi.RuntimeService) error {
@ -597,7 +587,7 @@ func (cm *containerManagerImpl) Start(node *v1.Node,
}, time.Second, stopChan)
// Starts device plugin manager.
if err := cm.devicePluginManager.Start(deviceplugin.ActivePodsFunc(activePods)); err != nil {
if err := cm.devicePluginManager.Start(deviceplugin.ActivePodsFunc(activePods), sourcesReady); err != nil {
return err
}
return nil
@ -896,3 +886,7 @@ func (cm *containerManagerImpl) GetCapacity() v1.ResourceList {
defer cm.RUnlock()
return cm.capacity
}
func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceList, []string) {
return cm.devicePluginManager.GetCapacity()
}

View File

@ -22,6 +22,7 @@ import (
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/status"
@ -32,7 +33,7 @@ type containerManagerStub struct{}
var _ ContainerManager = &containerManagerStub{}
func (cm *containerManagerStub) Start(_ *v1.Node, _ ActivePodsFunc, _ status.PodStatusProvider, _ internalapi.RuntimeService) error {
func (cm *containerManagerStub) Start(_ *v1.Node, _ ActivePodsFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService) error {
glog.V(2).Infof("Starting stub container manager")
return nil
}
@ -69,6 +70,10 @@ func (cm *containerManagerStub) GetCapacity() v1.ResourceList {
return nil
}
func (cm *containerManagerStub) GetDevicePluginResourceCapacity() (v1.ResourceList, []string) {
return nil, []string{}
}
func (cm *containerManagerStub) NewPodContainerManager() PodContainerManager {
return &podContainerManagerStub{}
}

View File

@ -26,6 +26,7 @@ import (
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/status"
@ -38,7 +39,7 @@ type unsupportedContainerManager struct {
var _ ContainerManager = &unsupportedContainerManager{}
func (unsupportedContainerManager) Start(_ *v1.Node, _ ActivePodsFunc, _ status.PodStatusProvider, _ internalapi.RuntimeService) error {
func (unsupportedContainerManager) Start(_ *v1.Node, _ ActivePodsFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService) error {
return fmt.Errorf("Container Manager is unsupported in this build")
}
@ -74,6 +75,10 @@ func (cm *unsupportedContainerManager) GetCapacity() v1.ResourceList {
return nil
}
func (cm *unsupportedContainerManager) GetDevicePluginResourceCapacity() (v1.ResourceList, []string) {
return nil, []string{}
}
func (cm *unsupportedContainerManager) NewPodContainerManager() PodContainerManager {
return &unsupportedPodContainerManager{}
}

View File

@ -25,6 +25,7 @@ import (
"k8s.io/client-go/tools/record"
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/util/mount"
)
@ -35,7 +36,7 @@ type containerManagerImpl struct {
var _ ContainerManager = &containerManagerImpl{}
func (cm *containerManagerImpl) Start(_ *v1.Node, _ ActivePodsFunc, _ status.PodStatusProvider, _ internalapi.RuntimeService) error {
func (cm *containerManagerImpl) Start(_ *v1.Node, _ ActivePodsFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService) error {
glog.V(2).Infof("Starting Windows stub container manager")
return nil
}

View File

@ -20,6 +20,7 @@ go_library(
deps = [
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/kubelet/apis/deviceplugin/v1alpha:go_default_library",
"//pkg/kubelet/config:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/lifecycle:go_default_library",
"//plugin/pkg/scheduler/schedulercache:go_default_library",

View File

@ -34,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)
@ -61,6 +62,10 @@ type ManagerImpl struct {
// could be counted when updating allocated devices
activePods ActivePodsFunc
// sourcesReady provides the readiness of kubelet configuration sources such as apiserver update readiness.
// We use it to determine when we can purge inactive pods from checkpointed state.
sourcesReady config.SourcesReady
// callback is used for updating devices' states in one time call.
// e.g. a new device is advertised, two old devices are deleted and a running device fails.
callback monitorCallback
@ -75,13 +80,17 @@ type ManagerImpl struct {
podDevices podDevices
}
// NewManagerImpl creates a new manager. updateCapacityFunc is called to
// update ContainerManager capacity when device capacity changes.
func NewManagerImpl(updateCapacityFunc func(v1.ResourceList)) (*ManagerImpl, error) {
return newManagerImpl(updateCapacityFunc, pluginapi.KubeletSocket)
type sourcesReadyStub struct{}
func (s *sourcesReadyStub) AddSource(source string) {}
func (s *sourcesReadyStub) AllReady() bool { return true }
// NewManagerImpl creates a new manager.
func NewManagerImpl() (*ManagerImpl, error) {
return newManagerImpl(pluginapi.KubeletSocket)
}
func newManagerImpl(updateCapacityFunc func(v1.ResourceList), socketPath string) (*ManagerImpl, error) {
func newManagerImpl(socketPath string) (*ManagerImpl, error) {
glog.V(2).Infof("Creating Device Plugin manager at %s", socketPath)
if socketPath == "" || !filepath.IsAbs(socketPath) {
@ -97,34 +106,36 @@ func newManagerImpl(updateCapacityFunc func(v1.ResourceList), socketPath string)
allocatedDevices: make(map[string]sets.String),
podDevices: make(podDevices),
}
manager.callback = manager.genericDeviceUpdateCallback
manager.callback = func(resourceName string, added, updated, deleted []pluginapi.Device) {
var capacity = v1.ResourceList{}
// The following structs are populated with real implementations in manager.Start()
// Before that, initializes them to perform no-op operations.
manager.activePods = func() []*v1.Pod { return []*v1.Pod{} }
manager.sourcesReady = &sourcesReadyStub{}
return manager, nil
}
func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, added, updated, deleted []pluginapi.Device) {
kept := append(updated, added...)
manager.mutex.Lock()
defer manager.mutex.Unlock()
if _, ok := manager.allDevices[resourceName]; !ok {
manager.allDevices[resourceName] = sets.NewString()
m.mutex.Lock()
if _, ok := m.allDevices[resourceName]; !ok {
m.allDevices[resourceName] = sets.NewString()
}
// For now, Manager only keeps track of healthy devices.
// We can revisit this later when the need comes to track unhealthy devices here.
// TODO: adds support to track unhealthy devices.
for _, dev := range kept {
if dev.Health == pluginapi.Healthy {
manager.allDevices[resourceName].Insert(dev.ID)
m.allDevices[resourceName].Insert(dev.ID)
} else {
manager.allDevices[resourceName].Delete(dev.ID)
m.allDevices[resourceName].Delete(dev.ID)
}
}
for _, dev := range deleted {
manager.allDevices[resourceName].Delete(dev.ID)
m.allDevices[resourceName].Delete(dev.ID)
}
capacity[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(manager.allDevices[resourceName].Len()), resource.DecimalSI)
updateCapacityFunc(capacity)
}
return manager, nil
m.mutex.Unlock()
m.writeCheckpoint()
}
func (m *ManagerImpl) removeContents(dir string) error {
@ -171,10 +182,11 @@ func (m *ManagerImpl) checkpointFile() string {
// Start starts the Device Plugin Manager amd start initialization of
// podDevices and allocatedDevices information from checkpoint-ed state and
// starts device plugin registration service.
func (m *ManagerImpl) Start(activePods ActivePodsFunc) error {
func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {
glog.V(2).Infof("Starting Device Plugin manager")
m.activePods = activePods
m.sourcesReady = sourcesReady
// Loads in allocatedDevices information from disk.
err := m.readCheckpoint()
@ -238,6 +250,9 @@ func (m *ManagerImpl) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.P
}
}
m.mutex.Lock()
defer m.mutex.Unlock()
// quick return if no pluginResources requested
if _, podRequireDevicePluginResource := m.podDevices[string(pod.UID)]; !podRequireDevicePluginResource {
return nil
@ -334,8 +349,6 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
if old, ok := m.endpoints[r.ResourceName]; ok && old == e {
glog.V(2).Infof("Delete resource for endpoint %v", e)
delete(m.endpoints, r.ResourceName)
// Issues callback to delete all of devices.
e.callback(e.resourceName, []pluginapi.Device{}, []pluginapi.Device{}, e.getDevices())
}
glog.V(2).Infof("Unregistered endpoint %v", e)
@ -343,10 +356,56 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
}()
}
// GetCapacity is expected to be called when Kubelet updates its node status.
// The first returned variable contains the registered device plugin resource capacity.
// The second returned variable contains previously registered resources that are no longer active.
// Kubelet uses this information to update resource capacity/allocatable in its node status.
// After the call, device plugin can remove the inactive resources from its internal list as the
// change is already reflected in Kubelet node status.
// Note in the special case after Kubelet restarts, device plugin resource capacities can
// temporarily drop to zero till corresponding device plugins re-register. This is OK because
// cm.UpdatePluginResource() run during predicate Admit guarantees we adjust nodeinfo
// capacity for already allocated pods so that they can continue to run. However, new pods
// requiring device plugin resources will not be scheduled till device plugin re-registers.
func (m *ManagerImpl) GetCapacity() (v1.ResourceList, []string) {
needsUpdateCheckpoint := false
var capacity = v1.ResourceList{}
var deletedResources []string
m.mutex.Lock()
for resourceName, devices := range m.allDevices {
if _, ok := m.endpoints[resourceName]; !ok {
delete(m.allDevices, resourceName)
deletedResources = append(deletedResources, resourceName)
needsUpdateCheckpoint = true
} else {
capacity[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)
}
}
m.mutex.Unlock()
if needsUpdateCheckpoint {
m.writeCheckpoint()
}
return capacity, deletedResources
}
// checkpointData struct is used to store pod to device allocation information
// and registered device information in a checkpoint file.
// TODO: add version control when we need to change checkpoint format.
type checkpointData struct {
PodDeviceEntries []podDevicesCheckpointEntry
RegisteredDevices map[string][]string
}
// Checkpoints device to container allocation information to disk.
func (m *ManagerImpl) writeCheckpoint() error {
m.mutex.Lock()
data := m.podDevices.toCheckpointData()
data := checkpointData{
PodDeviceEntries: m.podDevices.toCheckpointData(),
RegisteredDevices: make(map[string][]string),
}
for resource, devices := range m.allDevices {
data.RegisteredDevices[resource] = devices.UnsortedList()
}
m.mutex.Unlock()
dataJSON, err := json.Marshal(data)
@ -373,14 +432,23 @@ func (m *ManagerImpl) readCheckpoint() error {
m.mutex.Lock()
defer m.mutex.Unlock()
m.podDevices.fromCheckpointData(data)
m.podDevices.fromCheckpointData(data.PodDeviceEntries)
m.allocatedDevices = m.podDevices.devices()
for resource, devices := range data.RegisteredDevices {
m.allDevices[resource] = sets.NewString()
for _, dev := range devices {
m.allDevices[resource].Insert(dev)
}
}
return nil
}
// updateAllocatedDevices gets a list of active pods and then frees any Devices that are bound to
// terminated pods. Returns error on failure.
func (m *ManagerImpl) updateAllocatedDevices(activePods []*v1.Pod) {
if !m.sourcesReady.AllReady() {
return
}
m.mutex.Lock()
defer m.mutex.Unlock()
activePodUids := sets.NewString()
@ -392,7 +460,7 @@ func (m *ManagerImpl) updateAllocatedDevices(activePods []*v1.Pod) {
if len(podsToBeRemoved) <= 0 {
return
}
glog.V(5).Infof("pods to be removed: %v", podsToBeRemoved.List())
glog.V(3).Infof("pods to be removed: %v", podsToBeRemoved.List())
m.podDevices.delete(podsToBeRemoved.List())
// Regenerated allocatedDevices after we update pod allocation information.
m.allocatedDevices = m.podDevices.devices()
@ -420,6 +488,11 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi
// No change, no work.
return nil, nil
}
glog.V(3).Infof("Needs to allocate %v %v for pod %q container %q", needed, resource, podUID, contName)
// Needs to allocate additional devices.
if _, ok := m.allDevices[resource]; !ok {
return nil, fmt.Errorf("can't allocate unregistered device %v", resource)
}
devices = sets.NewString()
// Needs to allocate additional devices.
if m.allocatedDevices[resource] == nil {
@ -455,7 +528,11 @@ func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Cont
resource := string(k)
needed := int(v.Value())
glog.V(3).Infof("needs %d %s", needed, resource)
if _, registeredResource := m.allDevices[resource]; !registeredResource {
_, registeredResource := m.allDevices[resource]
_, allocatedResource := m.allocatedDevices[resource]
// Continues if this is neither an active device plugin resource nor
// a resource we have previously allocated.
if !registeredResource && !allocatedResource {
continue
}
// Updates allocatedDevices to garbage collect any stranded resources

View File

@ -19,6 +19,7 @@ package deviceplugin
import (
"k8s.io/api/core/v1"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)
@ -32,7 +33,7 @@ func NewManagerStub() (*ManagerStub, error) {
}
// Start simply returns nil.
func (h *ManagerStub) Start(activePods ActivePodsFunc) error {
func (h *ManagerStub) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {
return nil
}
@ -55,3 +56,8 @@ func (h *ManagerStub) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.P
func (h *ManagerStub) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions {
return nil
}
// GetCapacity simply returns nil capacity and empty removed resource list.
func (h *ManagerStub) GetCapacity() (v1.ResourceList, []string) {
return nil, []string{}
}

View File

@ -43,8 +43,7 @@ const (
)
func TestNewManagerImpl(t *testing.T) {
verifyCapacityFunc := func(updates v1.ResourceList) {}
_, err := newManagerImpl(verifyCapacityFunc, socketName)
_, err := newManagerImpl(socketName)
require.NoError(t, err)
}
@ -123,8 +122,7 @@ func TestDevicePluginReRegistration(t *testing.T) {
}
func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback) (Manager, *Stub) {
updateCapacity := func(v1.ResourceList) {}
m, err := newManagerImpl(updateCapacity, socketName)
m, err := newManagerImpl(socketName)
require.NoError(t, err)
m.callback = callback
@ -132,7 +130,7 @@ func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback) (Ma
activePods := func() []*v1.Pod {
return []*v1.Pod{}
}
err = m.Start(activePods)
err = m.Start(activePods, &sourcesReadyStub{})
require.NoError(t, err)
p := NewDevicePluginStub(devs, pluginSocketName)
@ -148,12 +146,8 @@ func cleanup(t *testing.T, m Manager, p *Stub) {
}
func TestUpdateCapacity(t *testing.T) {
var expected = v1.ResourceList{}
testManager, err := newManagerImpl(socketName)
as := assert.New(t)
verifyCapacityFunc := func(updates v1.ResourceList) {
as.Equal(expected, updates)
}
testManager, err := newManagerImpl(verifyCapacityFunc, socketName)
as.NotNil(testManager)
as.Nil(err)
@ -162,23 +156,68 @@ func TestUpdateCapacity(t *testing.T) {
{ID: "Device2", Health: pluginapi.Healthy},
{ID: "Device3", Health: pluginapi.Unhealthy},
}
callback := testManager.genericDeviceUpdateCallback
resourceName := "resource1"
// Adds three devices for resource1, two healthy and one unhealthy.
// Expects capacity for resource1 to be 2.
expected[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(2), resource.DecimalSI)
testManager.callback(resourceName, devs, []pluginapi.Device{}, []pluginapi.Device{})
resourceName1 := "domain1.com/resource1"
testManager.endpoints[resourceName1] = &endpointImpl{devices: make(map[string]pluginapi.Device)}
callback(resourceName1, devs, []pluginapi.Device{}, []pluginapi.Device{})
capacity, removedResources := testManager.GetCapacity()
resource1Capacity, ok := capacity[v1.ResourceName(resourceName1)]
as.True(ok)
as.Equal(int64(2), resource1Capacity.Value())
as.Equal(0, len(removedResources))
// Deletes an unhealthy device should NOT change capacity.
testManager.callback(resourceName, []pluginapi.Device{}, []pluginapi.Device{}, []pluginapi.Device{devs[2]})
callback(resourceName1, []pluginapi.Device{}, []pluginapi.Device{}, []pluginapi.Device{devs[2]})
capacity, removedResources = testManager.GetCapacity()
resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)]
as.True(ok)
as.Equal(int64(2), resource1Capacity.Value())
as.Equal(0, len(removedResources))
// Updates a healthy device to unhealthy should reduce capacity by 1.
expected[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(1), resource.DecimalSI)
dev2 := devs[1]
dev2.Health = pluginapi.Unhealthy
callback(resourceName1, []pluginapi.Device{}, []pluginapi.Device{dev2}, []pluginapi.Device{})
capacity, removedResources = testManager.GetCapacity()
resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)]
as.True(ok)
as.Equal(int64(1), resource1Capacity.Value())
as.Equal(0, len(removedResources))
// Deletes a healthy device should reduce capacity by 1.
expected[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(0), resource.DecimalSI)
callback(resourceName1, []pluginapi.Device{}, []pluginapi.Device{}, []pluginapi.Device{devs[0]})
capacity, removedResources = testManager.GetCapacity()
resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)]
as.True(ok)
as.Equal(int64(0), resource1Capacity.Value())
as.Equal(0, len(removedResources))
// Tests adding another resource.
delete(expected, v1.ResourceName(resourceName))
resourceName2 := "resource2"
expected[v1.ResourceName(resourceName2)] = *resource.NewQuantity(int64(2), resource.DecimalSI)
testManager.callback(resourceName2, devs, []pluginapi.Device{}, []pluginapi.Device{})
testManager.endpoints[resourceName2] = &endpointImpl{devices: make(map[string]pluginapi.Device)}
callback(resourceName2, devs, []pluginapi.Device{}, []pluginapi.Device{})
capacity, removedResources = testManager.GetCapacity()
as.Equal(2, len(capacity))
resource2Capacity, ok := capacity[v1.ResourceName(resourceName2)]
as.True(ok)
as.Equal(int64(2), resource2Capacity.Value())
as.Equal(0, len(removedResources))
// Removes resourceName1 endpoint. Verifies testManager.GetCapacity() reports that resourceName1
// is removed from capacity and it no longer exists in allDevices after the call.
delete(testManager.endpoints, resourceName1)
capacity, removed := testManager.GetCapacity()
as.Equal([]string{resourceName1}, removed)
_, ok = capacity[v1.ResourceName(resourceName1)]
as.False(ok)
val, ok := capacity[v1.ResourceName(resourceName2)]
as.True(ok)
as.Equal(int64(2), val.Value())
_, ok = testManager.allDevices[resourceName1]
as.False(ok)
}
type stringPairType struct {
@ -245,8 +284,19 @@ func TestCheckpoint(t *testing.T) {
constructAllocResp(map[string]string{"/dev/r1dev4": "/dev/r1dev4"},
map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{}))
testManager.allDevices[resourceName1] = sets.NewString()
testManager.allDevices[resourceName1].Insert("dev1")
testManager.allDevices[resourceName1].Insert("dev2")
testManager.allDevices[resourceName1].Insert("dev3")
testManager.allDevices[resourceName1].Insert("dev4")
testManager.allDevices[resourceName1].Insert("dev5")
testManager.allDevices[resourceName2] = sets.NewString()
testManager.allDevices[resourceName2].Insert("dev1")
testManager.allDevices[resourceName2].Insert("dev2")
expectedPodDevices := testManager.podDevices
expectedAllocatedDevices := testManager.podDevices.devices()
expectedAllDevices := testManager.allDevices
err := testManager.writeCheckpoint()
as := assert.New(t)
@ -272,6 +322,7 @@ func TestCheckpoint(t *testing.T) {
}
}
as.True(reflect.DeepEqual(expectedAllocatedDevices, testManager.allocatedDevices))
as.True(reflect.DeepEqual(expectedAllDevices, testManager.allDevices))
}
type activePodsStub struct {
@ -341,6 +392,7 @@ func TestPodContainerDeviceAllocation(t *testing.T) {
endpoints: make(map[string]endpoint),
podDevices: make(podDevices),
activePods: podsStub.getActivePods,
sourcesReady: &sourcesReadyStub{},
}
testManager.allDevices[resourceName1] = sets.NewString()

View File

@ -94,7 +94,8 @@ func (pdev podDevices) devices() map[string]sets.String {
return ret
}
type checkpointEntry struct {
// podDevicesCheckpointEntry is used to record <pod, container> to device allocation information.
type podDevicesCheckpointEntry struct {
PodUID string
ContainerName string
ResourceName string
@ -102,16 +103,9 @@ type checkpointEntry struct {
AllocResp []byte
}
// checkpointData struct is used to store pod to device allocation information
// in a checkpoint file.
// TODO: add version control when we need to change checkpoint format.
type checkpointData struct {
Entries []checkpointEntry
}
// Turns podDevices to checkpointData.
func (pdev podDevices) toCheckpointData() checkpointData {
var data checkpointData
func (pdev podDevices) toCheckpointData() []podDevicesCheckpointEntry {
var data []podDevicesCheckpointEntry
for podUID, containerDevices := range pdev {
for conName, resources := range containerDevices {
for resource, devices := range resources {
@ -126,7 +120,7 @@ func (pdev podDevices) toCheckpointData() checkpointData {
glog.Errorf("Can't marshal allocResp for %v %v %v: %v", podUID, conName, resource, err)
continue
}
data.Entries = append(data.Entries, checkpointEntry{podUID, conName, resource, devIds, allocResp})
data = append(data, podDevicesCheckpointEntry{podUID, conName, resource, devIds, allocResp})
}
}
}
@ -134,8 +128,8 @@ func (pdev podDevices) toCheckpointData() checkpointData {
}
// Populates podDevices from the passed in checkpointData.
func (pdev podDevices) fromCheckpointData(data checkpointData) {
for _, entry := range data.Entries {
func (pdev podDevices) fromCheckpointData(data []podDevicesCheckpointEntry) {
for _, entry := range data {
glog.V(2).Infof("Get checkpoint entry: %v %v %v %v %v\n",
entry.PodUID, entry.ContainerName, entry.ResourceName, entry.DeviceIDs, entry.AllocResp)
devIDs := sets.NewString()

View File

@ -19,6 +19,7 @@ package deviceplugin
import (
"k8s.io/api/core/v1"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
@ -27,7 +28,7 @@ import (
// Manager manages all the Device Plugins running on a node.
type Manager interface {
// Start starts device plugin registration service.
Start(activePods ActivePodsFunc) error
Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error
// Devices is the map of devices that have registered themselves
// against the manager.
@ -51,6 +52,10 @@ type Manager interface {
// for the passed-in <pod, container> and returns its DeviceRunContainerOptions
// for the found one. An empty struct is returned in case no cached state is found.
GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions
// GetCapacity returns the amount of available device plugin resource capacity
// and inactive device plugin resources previously registered on the node.
GetCapacity() (v1.ResourceList, []string)
}
// DeviceRunContainerOptions contains the combined container runtime settings to consume its allocated devices.

View File

@ -1304,7 +1304,7 @@ func (kl *Kubelet) initializeModules() error {
return fmt.Errorf("Kubelet failed to get node info: %v", err)
}
if err := kl.containerManager.Start(node, kl.GetActivePods, kl.statusManager, kl.runtimeService); err != nil {
if err := kl.containerManager.Start(node, kl.GetActivePods, kl.sourcesReady, kl.statusManager, kl.runtimeService); err != nil {
return fmt.Errorf("Failed to start ContainerManager %v", err)
}

View File

@ -601,14 +601,16 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) {
}
}
currentCapacity := kl.containerManager.GetCapacity()
if currentCapacity != nil {
for k, v := range currentCapacity {
if v1helper.IsExtendedResourceName(k) {
devicePluginCapacity, removedDevicePlugins := kl.containerManager.GetDevicePluginResourceCapacity()
if devicePluginCapacity != nil {
for k, v := range devicePluginCapacity {
glog.V(2).Infof("Update capacity for %s to %d", k, v.Value())
node.Status.Capacity[k] = v
}
}
for _, removedResource := range removedDevicePlugins {
glog.V(2).Infof("Remove capacity for %s", removedResource)
delete(node.Status.Capacity, v1.ResourceName(removedResource))
}
}