diff --git a/pkg/kubelet/cm/BUILD b/pkg/kubelet/cm/BUILD index e314eb9ddb5..127192c637d 100644 --- a/pkg/kubelet/cm/BUILD +++ b/pkg/kubelet/cm/BUILD @@ -34,6 +34,7 @@ go_library( "//pkg/kubelet/apis/cri:go_default_library", "//pkg/kubelet/cadvisor:go_default_library", "//pkg/kubelet/cm/cpumanager:go_default_library", + "//pkg/kubelet/config:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/eviction/api:go_default_library", "//pkg/kubelet/lifecycle:go_default_library", diff --git a/pkg/kubelet/cm/container_manager.go b/pkg/kubelet/cm/container_manager.go index 4fa45c4d95a..dfdcf8d82c8 100644 --- a/pkg/kubelet/cm/container_manager.go +++ b/pkg/kubelet/cm/container_manager.go @@ -23,6 +23,7 @@ import ( // TODO: Migrate kubelet to either use its own internal objects or client library. "k8s.io/api/core/v1" internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri" + "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" "k8s.io/kubernetes/pkg/kubelet/lifecycle" @@ -41,7 +42,7 @@ type ContainerManager interface { // Runs the container manager's housekeeping. // - Ensures that the Docker daemon is in a container. // - Creates the system container where all non-containerized processes run. - Start(*v1.Node, ActivePodsFunc, status.PodStatusProvider, internalapi.RuntimeService) error + Start(*v1.Node, ActivePodsFunc, config.SourcesReady, status.PodStatusProvider, internalapi.RuntimeService) error // SystemCgroupsLimit returns resources allocated to system cgroups in the machine. // These cgroups include the system and Kubernetes services. @@ -69,6 +70,10 @@ type ContainerManager interface { // GetCapacity returns the amount of compute resources tracked by container manager available on the node. GetCapacity() v1.ResourceList + // GetDevicePluginResourceCapacity returns the amount of device plugin resources available on the node + // and inactive device plugin resources previously registered on the node. + GetDevicePluginResourceCapacity() (v1.ResourceList, []string) + // UpdateQOSCgroups performs housekeeping updates to ensure that the top // level QoS containers have their desired state in a thread-safe way UpdateQOSCgroups() error diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index 870e4900e1e..6c6c7068172 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -47,6 +47,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/kubelet/cm/deviceplugin" cmutil "k8s.io/kubernetes/pkg/kubelet/cm/util" + "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/qos" @@ -262,21 +263,9 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I qosContainerManager: qosContainerManager, } - updateDeviceCapacityFunc := func(updates v1.ResourceList) { - cm.Lock() - defer cm.Unlock() - for k, v := range updates { - if v.Value() <= 0 { - delete(cm.capacity, k) - } else { - cm.capacity[k] = v - } - } - } - glog.Infof("Creating device plugin manager: %t", devicePluginEnabled) if devicePluginEnabled { - cm.devicePluginManager, err = deviceplugin.NewManagerImpl(updateDeviceCapacityFunc) + cm.devicePluginManager, err = deviceplugin.NewManagerImpl() } else { cm.devicePluginManager, err = deviceplugin.NewManagerStub() } @@ -530,6 +519,7 @@ func (cm *containerManagerImpl) Status() Status { func (cm *containerManagerImpl) Start(node *v1.Node, activePods ActivePodsFunc, + sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, runtimeService internalapi.RuntimeService) error { @@ -597,7 +587,7 @@ func (cm *containerManagerImpl) Start(node *v1.Node, }, time.Second, stopChan) // Starts device plugin manager. - if err := cm.devicePluginManager.Start(deviceplugin.ActivePodsFunc(activePods)); err != nil { + if err := cm.devicePluginManager.Start(deviceplugin.ActivePodsFunc(activePods), sourcesReady); err != nil { return err } return nil @@ -896,3 +886,7 @@ func (cm *containerManagerImpl) GetCapacity() v1.ResourceList { defer cm.RUnlock() return cm.capacity } + +func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceList, []string) { + return cm.devicePluginManager.GetCapacity() +} diff --git a/pkg/kubelet/cm/container_manager_stub.go b/pkg/kubelet/cm/container_manager_stub.go index beaf57a342e..27a86849582 100644 --- a/pkg/kubelet/cm/container_manager_stub.go +++ b/pkg/kubelet/cm/container_manager_stub.go @@ -22,6 +22,7 @@ import ( internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" + "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/status" @@ -32,7 +33,7 @@ type containerManagerStub struct{} var _ ContainerManager = &containerManagerStub{} -func (cm *containerManagerStub) Start(_ *v1.Node, _ ActivePodsFunc, _ status.PodStatusProvider, _ internalapi.RuntimeService) error { +func (cm *containerManagerStub) Start(_ *v1.Node, _ ActivePodsFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService) error { glog.V(2).Infof("Starting stub container manager") return nil } @@ -69,6 +70,10 @@ func (cm *containerManagerStub) GetCapacity() v1.ResourceList { return nil } +func (cm *containerManagerStub) GetDevicePluginResourceCapacity() (v1.ResourceList, []string) { + return nil, []string{} +} + func (cm *containerManagerStub) NewPodContainerManager() PodContainerManager { return &podContainerManagerStub{} } diff --git a/pkg/kubelet/cm/container_manager_unsupported.go b/pkg/kubelet/cm/container_manager_unsupported.go index f3937703686..6453397b177 100644 --- a/pkg/kubelet/cm/container_manager_unsupported.go +++ b/pkg/kubelet/cm/container_manager_unsupported.go @@ -26,6 +26,7 @@ import ( internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri" "k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" + "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/status" @@ -38,7 +39,7 @@ type unsupportedContainerManager struct { var _ ContainerManager = &unsupportedContainerManager{} -func (unsupportedContainerManager) Start(_ *v1.Node, _ ActivePodsFunc, _ status.PodStatusProvider, _ internalapi.RuntimeService) error { +func (unsupportedContainerManager) Start(_ *v1.Node, _ ActivePodsFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService) error { return fmt.Errorf("Container Manager is unsupported in this build") } @@ -74,6 +75,10 @@ func (cm *unsupportedContainerManager) GetCapacity() v1.ResourceList { return nil } +func (cm *unsupportedContainerManager) GetDevicePluginResourceCapacity() (v1.ResourceList, []string) { + return nil, []string{} +} + func (cm *unsupportedContainerManager) NewPodContainerManager() PodContainerManager { return &unsupportedPodContainerManager{} } diff --git a/pkg/kubelet/cm/container_manager_windows.go b/pkg/kubelet/cm/container_manager_windows.go index 8102ceca2dd..56bdbb9d363 100644 --- a/pkg/kubelet/cm/container_manager_windows.go +++ b/pkg/kubelet/cm/container_manager_windows.go @@ -25,6 +25,7 @@ import ( "k8s.io/client-go/tools/record" internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri" "k8s.io/kubernetes/pkg/kubelet/cadvisor" + "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/status" "k8s.io/kubernetes/pkg/util/mount" ) @@ -35,7 +36,7 @@ type containerManagerImpl struct { var _ ContainerManager = &containerManagerImpl{} -func (cm *containerManagerImpl) Start(_ *v1.Node, _ ActivePodsFunc, _ status.PodStatusProvider, _ internalapi.RuntimeService) error { +func (cm *containerManagerImpl) Start(_ *v1.Node, _ ActivePodsFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService) error { glog.V(2).Infof("Starting Windows stub container manager") return nil } diff --git a/pkg/kubelet/cm/deviceplugin/BUILD b/pkg/kubelet/cm/deviceplugin/BUILD index 341b59691a7..29702ab64c8 100644 --- a/pkg/kubelet/cm/deviceplugin/BUILD +++ b/pkg/kubelet/cm/deviceplugin/BUILD @@ -20,6 +20,7 @@ go_library( deps = [ "//pkg/apis/core/v1/helper:go_default_library", "//pkg/kubelet/apis/deviceplugin/v1alpha:go_default_library", + "//pkg/kubelet/config:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/lifecycle:go_default_library", "//plugin/pkg/scheduler/schedulercache:go_default_library", diff --git a/pkg/kubelet/cm/deviceplugin/manager.go b/pkg/kubelet/cm/deviceplugin/manager.go index be46973ef74..b30e0fc4d5c 100644 --- a/pkg/kubelet/cm/deviceplugin/manager.go +++ b/pkg/kubelet/cm/deviceplugin/manager.go @@ -34,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha" + "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) @@ -61,6 +62,10 @@ type ManagerImpl struct { // could be counted when updating allocated devices activePods ActivePodsFunc + // sourcesReady provides the readiness of kubelet configuration sources such as apiserver update readiness. + // We use it to determine when we can purge inactive pods from checkpointed state. + sourcesReady config.SourcesReady + // callback is used for updating devices' states in one time call. // e.g. a new device is advertised, two old devices are deleted and a running device fails. callback monitorCallback @@ -75,13 +80,17 @@ type ManagerImpl struct { podDevices podDevices } -// NewManagerImpl creates a new manager. updateCapacityFunc is called to -// update ContainerManager capacity when device capacity changes. -func NewManagerImpl(updateCapacityFunc func(v1.ResourceList)) (*ManagerImpl, error) { - return newManagerImpl(updateCapacityFunc, pluginapi.KubeletSocket) +type sourcesReadyStub struct{} + +func (s *sourcesReadyStub) AddSource(source string) {} +func (s *sourcesReadyStub) AllReady() bool { return true } + +// NewManagerImpl creates a new manager. +func NewManagerImpl() (*ManagerImpl, error) { + return newManagerImpl(pluginapi.KubeletSocket) } -func newManagerImpl(updateCapacityFunc func(v1.ResourceList), socketPath string) (*ManagerImpl, error) { +func newManagerImpl(socketPath string) (*ManagerImpl, error) { glog.V(2).Infof("Creating Device Plugin manager at %s", socketPath) if socketPath == "" || !filepath.IsAbs(socketPath) { @@ -97,36 +106,38 @@ func newManagerImpl(updateCapacityFunc func(v1.ResourceList), socketPath string) allocatedDevices: make(map[string]sets.String), podDevices: make(podDevices), } + manager.callback = manager.genericDeviceUpdateCallback - manager.callback = func(resourceName string, added, updated, deleted []pluginapi.Device) { - var capacity = v1.ResourceList{} - kept := append(updated, added...) - - manager.mutex.Lock() - defer manager.mutex.Unlock() - - if _, ok := manager.allDevices[resourceName]; !ok { - manager.allDevices[resourceName] = sets.NewString() - } - // For now, Manager only keeps track of healthy devices. - // We can revisit this later when the need comes to track unhealthy devices here. - for _, dev := range kept { - if dev.Health == pluginapi.Healthy { - manager.allDevices[resourceName].Insert(dev.ID) - } else { - manager.allDevices[resourceName].Delete(dev.ID) - } - } - for _, dev := range deleted { - manager.allDevices[resourceName].Delete(dev.ID) - } - capacity[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(manager.allDevices[resourceName].Len()), resource.DecimalSI) - updateCapacityFunc(capacity) - } + // The following structs are populated with real implementations in manager.Start() + // Before that, initializes them to perform no-op operations. + manager.activePods = func() []*v1.Pod { return []*v1.Pod{} } + manager.sourcesReady = &sourcesReadyStub{} return manager, nil } +func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, added, updated, deleted []pluginapi.Device) { + kept := append(updated, added...) + m.mutex.Lock() + if _, ok := m.allDevices[resourceName]; !ok { + m.allDevices[resourceName] = sets.NewString() + } + // For now, Manager only keeps track of healthy devices. + // TODO: adds support to track unhealthy devices. + for _, dev := range kept { + if dev.Health == pluginapi.Healthy { + m.allDevices[resourceName].Insert(dev.ID) + } else { + m.allDevices[resourceName].Delete(dev.ID) + } + } + for _, dev := range deleted { + m.allDevices[resourceName].Delete(dev.ID) + } + m.mutex.Unlock() + m.writeCheckpoint() +} + func (m *ManagerImpl) removeContents(dir string) error { d, err := os.Open(dir) if err != nil { @@ -171,10 +182,11 @@ func (m *ManagerImpl) checkpointFile() string { // Start starts the Device Plugin Manager amd start initialization of // podDevices and allocatedDevices information from checkpoint-ed state and // starts device plugin registration service. -func (m *ManagerImpl) Start(activePods ActivePodsFunc) error { +func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error { glog.V(2).Infof("Starting Device Plugin manager") m.activePods = activePods + m.sourcesReady = sourcesReady // Loads in allocatedDevices information from disk. err := m.readCheckpoint() @@ -238,6 +250,9 @@ func (m *ManagerImpl) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.P } } + m.mutex.Lock() + defer m.mutex.Unlock() + // quick return if no pluginResources requested if _, podRequireDevicePluginResource := m.podDevices[string(pod.UID)]; !podRequireDevicePluginResource { return nil @@ -334,8 +349,6 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) { if old, ok := m.endpoints[r.ResourceName]; ok && old == e { glog.V(2).Infof("Delete resource for endpoint %v", e) delete(m.endpoints, r.ResourceName) - // Issues callback to delete all of devices. - e.callback(e.resourceName, []pluginapi.Device{}, []pluginapi.Device{}, e.getDevices()) } glog.V(2).Infof("Unregistered endpoint %v", e) @@ -343,10 +356,56 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) { }() } +// GetCapacity is expected to be called when Kubelet updates its node status. +// The first returned variable contains the registered device plugin resource capacity. +// The second returned variable contains previously registered resources that are no longer active. +// Kubelet uses this information to update resource capacity/allocatable in its node status. +// After the call, device plugin can remove the inactive resources from its internal list as the +// change is already reflected in Kubelet node status. +// Note in the special case after Kubelet restarts, device plugin resource capacities can +// temporarily drop to zero till corresponding device plugins re-register. This is OK because +// cm.UpdatePluginResource() run during predicate Admit guarantees we adjust nodeinfo +// capacity for already allocated pods so that they can continue to run. However, new pods +// requiring device plugin resources will not be scheduled till device plugin re-registers. +func (m *ManagerImpl) GetCapacity() (v1.ResourceList, []string) { + needsUpdateCheckpoint := false + var capacity = v1.ResourceList{} + var deletedResources []string + m.mutex.Lock() + for resourceName, devices := range m.allDevices { + if _, ok := m.endpoints[resourceName]; !ok { + delete(m.allDevices, resourceName) + deletedResources = append(deletedResources, resourceName) + needsUpdateCheckpoint = true + } else { + capacity[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI) + } + } + m.mutex.Unlock() + if needsUpdateCheckpoint { + m.writeCheckpoint() + } + return capacity, deletedResources +} + +// checkpointData struct is used to store pod to device allocation information +// and registered device information in a checkpoint file. +// TODO: add version control when we need to change checkpoint format. +type checkpointData struct { + PodDeviceEntries []podDevicesCheckpointEntry + RegisteredDevices map[string][]string +} + // Checkpoints device to container allocation information to disk. func (m *ManagerImpl) writeCheckpoint() error { m.mutex.Lock() - data := m.podDevices.toCheckpointData() + data := checkpointData{ + PodDeviceEntries: m.podDevices.toCheckpointData(), + RegisteredDevices: make(map[string][]string), + } + for resource, devices := range m.allDevices { + data.RegisteredDevices[resource] = devices.UnsortedList() + } m.mutex.Unlock() dataJSON, err := json.Marshal(data) @@ -373,14 +432,23 @@ func (m *ManagerImpl) readCheckpoint() error { m.mutex.Lock() defer m.mutex.Unlock() - m.podDevices.fromCheckpointData(data) + m.podDevices.fromCheckpointData(data.PodDeviceEntries) m.allocatedDevices = m.podDevices.devices() + for resource, devices := range data.RegisteredDevices { + m.allDevices[resource] = sets.NewString() + for _, dev := range devices { + m.allDevices[resource].Insert(dev) + } + } return nil } // updateAllocatedDevices gets a list of active pods and then frees any Devices that are bound to // terminated pods. Returns error on failure. func (m *ManagerImpl) updateAllocatedDevices(activePods []*v1.Pod) { + if !m.sourcesReady.AllReady() { + return + } m.mutex.Lock() defer m.mutex.Unlock() activePodUids := sets.NewString() @@ -392,7 +460,7 @@ func (m *ManagerImpl) updateAllocatedDevices(activePods []*v1.Pod) { if len(podsToBeRemoved) <= 0 { return } - glog.V(5).Infof("pods to be removed: %v", podsToBeRemoved.List()) + glog.V(3).Infof("pods to be removed: %v", podsToBeRemoved.List()) m.podDevices.delete(podsToBeRemoved.List()) // Regenerated allocatedDevices after we update pod allocation information. m.allocatedDevices = m.podDevices.devices() @@ -420,6 +488,11 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi // No change, no work. return nil, nil } + glog.V(3).Infof("Needs to allocate %v %v for pod %q container %q", needed, resource, podUID, contName) + // Needs to allocate additional devices. + if _, ok := m.allDevices[resource]; !ok { + return nil, fmt.Errorf("can't allocate unregistered device %v", resource) + } devices = sets.NewString() // Needs to allocate additional devices. if m.allocatedDevices[resource] == nil { @@ -455,7 +528,11 @@ func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Cont resource := string(k) needed := int(v.Value()) glog.V(3).Infof("needs %d %s", needed, resource) - if _, registeredResource := m.allDevices[resource]; !registeredResource { + _, registeredResource := m.allDevices[resource] + _, allocatedResource := m.allocatedDevices[resource] + // Continues if this is neither an active device plugin resource nor + // a resource we have previously allocated. + if !registeredResource && !allocatedResource { continue } // Updates allocatedDevices to garbage collect any stranded resources diff --git a/pkg/kubelet/cm/deviceplugin/manager_stub.go b/pkg/kubelet/cm/deviceplugin/manager_stub.go index 450164b2ee8..903a0077a2c 100644 --- a/pkg/kubelet/cm/deviceplugin/manager_stub.go +++ b/pkg/kubelet/cm/deviceplugin/manager_stub.go @@ -19,6 +19,7 @@ package deviceplugin import ( "k8s.io/api/core/v1" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha" + "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) @@ -32,7 +33,7 @@ func NewManagerStub() (*ManagerStub, error) { } // Start simply returns nil. -func (h *ManagerStub) Start(activePods ActivePodsFunc) error { +func (h *ManagerStub) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error { return nil } @@ -55,3 +56,8 @@ func (h *ManagerStub) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.P func (h *ManagerStub) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions { return nil } + +// GetCapacity simply returns nil capacity and empty removed resource list. +func (h *ManagerStub) GetCapacity() (v1.ResourceList, []string) { + return nil, []string{} +} diff --git a/pkg/kubelet/cm/deviceplugin/manager_test.go b/pkg/kubelet/cm/deviceplugin/manager_test.go index 20fb4275739..9a74ec93b50 100644 --- a/pkg/kubelet/cm/deviceplugin/manager_test.go +++ b/pkg/kubelet/cm/deviceplugin/manager_test.go @@ -43,8 +43,7 @@ const ( ) func TestNewManagerImpl(t *testing.T) { - verifyCapacityFunc := func(updates v1.ResourceList) {} - _, err := newManagerImpl(verifyCapacityFunc, socketName) + _, err := newManagerImpl(socketName) require.NoError(t, err) } @@ -123,8 +122,7 @@ func TestDevicePluginReRegistration(t *testing.T) { } func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback) (Manager, *Stub) { - updateCapacity := func(v1.ResourceList) {} - m, err := newManagerImpl(updateCapacity, socketName) + m, err := newManagerImpl(socketName) require.NoError(t, err) m.callback = callback @@ -132,7 +130,7 @@ func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback) (Ma activePods := func() []*v1.Pod { return []*v1.Pod{} } - err = m.Start(activePods) + err = m.Start(activePods, &sourcesReadyStub{}) require.NoError(t, err) p := NewDevicePluginStub(devs, pluginSocketName) @@ -148,12 +146,8 @@ func cleanup(t *testing.T, m Manager, p *Stub) { } func TestUpdateCapacity(t *testing.T) { - var expected = v1.ResourceList{} + testManager, err := newManagerImpl(socketName) as := assert.New(t) - verifyCapacityFunc := func(updates v1.ResourceList) { - as.Equal(expected, updates) - } - testManager, err := newManagerImpl(verifyCapacityFunc, socketName) as.NotNil(testManager) as.Nil(err) @@ -162,23 +156,68 @@ func TestUpdateCapacity(t *testing.T) { {ID: "Device2", Health: pluginapi.Healthy}, {ID: "Device3", Health: pluginapi.Unhealthy}, } + callback := testManager.genericDeviceUpdateCallback - resourceName := "resource1" // Adds three devices for resource1, two healthy and one unhealthy. // Expects capacity for resource1 to be 2. - expected[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(2), resource.DecimalSI) - testManager.callback(resourceName, devs, []pluginapi.Device{}, []pluginapi.Device{}) + resourceName1 := "domain1.com/resource1" + testManager.endpoints[resourceName1] = &endpointImpl{devices: make(map[string]pluginapi.Device)} + callback(resourceName1, devs, []pluginapi.Device{}, []pluginapi.Device{}) + capacity, removedResources := testManager.GetCapacity() + resource1Capacity, ok := capacity[v1.ResourceName(resourceName1)] + as.True(ok) + as.Equal(int64(2), resource1Capacity.Value()) + as.Equal(0, len(removedResources)) + // Deletes an unhealthy device should NOT change capacity. - testManager.callback(resourceName, []pluginapi.Device{}, []pluginapi.Device{}, []pluginapi.Device{devs[2]}) + callback(resourceName1, []pluginapi.Device{}, []pluginapi.Device{}, []pluginapi.Device{devs[2]}) + capacity, removedResources = testManager.GetCapacity() + resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)] + as.True(ok) + as.Equal(int64(2), resource1Capacity.Value()) + as.Equal(0, len(removedResources)) + // Updates a healthy device to unhealthy should reduce capacity by 1. - expected[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(1), resource.DecimalSI) + dev2 := devs[1] + dev2.Health = pluginapi.Unhealthy + callback(resourceName1, []pluginapi.Device{}, []pluginapi.Device{dev2}, []pluginapi.Device{}) + capacity, removedResources = testManager.GetCapacity() + resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)] + as.True(ok) + as.Equal(int64(1), resource1Capacity.Value()) + as.Equal(0, len(removedResources)) + // Deletes a healthy device should reduce capacity by 1. - expected[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(0), resource.DecimalSI) + callback(resourceName1, []pluginapi.Device{}, []pluginapi.Device{}, []pluginapi.Device{devs[0]}) + capacity, removedResources = testManager.GetCapacity() + resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)] + as.True(ok) + as.Equal(int64(0), resource1Capacity.Value()) + as.Equal(0, len(removedResources)) + // Tests adding another resource. - delete(expected, v1.ResourceName(resourceName)) resourceName2 := "resource2" - expected[v1.ResourceName(resourceName2)] = *resource.NewQuantity(int64(2), resource.DecimalSI) - testManager.callback(resourceName2, devs, []pluginapi.Device{}, []pluginapi.Device{}) + testManager.endpoints[resourceName2] = &endpointImpl{devices: make(map[string]pluginapi.Device)} + callback(resourceName2, devs, []pluginapi.Device{}, []pluginapi.Device{}) + capacity, removedResources = testManager.GetCapacity() + as.Equal(2, len(capacity)) + resource2Capacity, ok := capacity[v1.ResourceName(resourceName2)] + as.True(ok) + as.Equal(int64(2), resource2Capacity.Value()) + as.Equal(0, len(removedResources)) + + // Removes resourceName1 endpoint. Verifies testManager.GetCapacity() reports that resourceName1 + // is removed from capacity and it no longer exists in allDevices after the call. + delete(testManager.endpoints, resourceName1) + capacity, removed := testManager.GetCapacity() + as.Equal([]string{resourceName1}, removed) + _, ok = capacity[v1.ResourceName(resourceName1)] + as.False(ok) + val, ok := capacity[v1.ResourceName(resourceName2)] + as.True(ok) + as.Equal(int64(2), val.Value()) + _, ok = testManager.allDevices[resourceName1] + as.False(ok) } type stringPairType struct { @@ -245,8 +284,19 @@ func TestCheckpoint(t *testing.T) { constructAllocResp(map[string]string{"/dev/r1dev4": "/dev/r1dev4"}, map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{})) + testManager.allDevices[resourceName1] = sets.NewString() + testManager.allDevices[resourceName1].Insert("dev1") + testManager.allDevices[resourceName1].Insert("dev2") + testManager.allDevices[resourceName1].Insert("dev3") + testManager.allDevices[resourceName1].Insert("dev4") + testManager.allDevices[resourceName1].Insert("dev5") + testManager.allDevices[resourceName2] = sets.NewString() + testManager.allDevices[resourceName2].Insert("dev1") + testManager.allDevices[resourceName2].Insert("dev2") + expectedPodDevices := testManager.podDevices expectedAllocatedDevices := testManager.podDevices.devices() + expectedAllDevices := testManager.allDevices err := testManager.writeCheckpoint() as := assert.New(t) @@ -272,6 +322,7 @@ func TestCheckpoint(t *testing.T) { } } as.True(reflect.DeepEqual(expectedAllocatedDevices, testManager.allocatedDevices)) + as.True(reflect.DeepEqual(expectedAllDevices, testManager.allDevices)) } type activePodsStub struct { @@ -341,6 +392,7 @@ func TestPodContainerDeviceAllocation(t *testing.T) { endpoints: make(map[string]endpoint), podDevices: make(podDevices), activePods: podsStub.getActivePods, + sourcesReady: &sourcesReadyStub{}, } testManager.allDevices[resourceName1] = sets.NewString() diff --git a/pkg/kubelet/cm/deviceplugin/pod_devices.go b/pkg/kubelet/cm/deviceplugin/pod_devices.go index e903dbf1035..495a5729879 100644 --- a/pkg/kubelet/cm/deviceplugin/pod_devices.go +++ b/pkg/kubelet/cm/deviceplugin/pod_devices.go @@ -94,7 +94,8 @@ func (pdev podDevices) devices() map[string]sets.String { return ret } -type checkpointEntry struct { +// podDevicesCheckpointEntry is used to record to device allocation information. +type podDevicesCheckpointEntry struct { PodUID string ContainerName string ResourceName string @@ -102,16 +103,9 @@ type checkpointEntry struct { AllocResp []byte } -// checkpointData struct is used to store pod to device allocation information -// in a checkpoint file. -// TODO: add version control when we need to change checkpoint format. -type checkpointData struct { - Entries []checkpointEntry -} - // Turns podDevices to checkpointData. -func (pdev podDevices) toCheckpointData() checkpointData { - var data checkpointData +func (pdev podDevices) toCheckpointData() []podDevicesCheckpointEntry { + var data []podDevicesCheckpointEntry for podUID, containerDevices := range pdev { for conName, resources := range containerDevices { for resource, devices := range resources { @@ -126,7 +120,7 @@ func (pdev podDevices) toCheckpointData() checkpointData { glog.Errorf("Can't marshal allocResp for %v %v %v: %v", podUID, conName, resource, err) continue } - data.Entries = append(data.Entries, checkpointEntry{podUID, conName, resource, devIds, allocResp}) + data = append(data, podDevicesCheckpointEntry{podUID, conName, resource, devIds, allocResp}) } } } @@ -134,8 +128,8 @@ func (pdev podDevices) toCheckpointData() checkpointData { } // Populates podDevices from the passed in checkpointData. -func (pdev podDevices) fromCheckpointData(data checkpointData) { - for _, entry := range data.Entries { +func (pdev podDevices) fromCheckpointData(data []podDevicesCheckpointEntry) { + for _, entry := range data { glog.V(2).Infof("Get checkpoint entry: %v %v %v %v %v\n", entry.PodUID, entry.ContainerName, entry.ResourceName, entry.DeviceIDs, entry.AllocResp) devIDs := sets.NewString() diff --git a/pkg/kubelet/cm/deviceplugin/types.go b/pkg/kubelet/cm/deviceplugin/types.go index 4b1607a71dc..c4465a8be4c 100644 --- a/pkg/kubelet/cm/deviceplugin/types.go +++ b/pkg/kubelet/cm/deviceplugin/types.go @@ -19,6 +19,7 @@ package deviceplugin import ( "k8s.io/api/core/v1" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha" + "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" @@ -27,7 +28,7 @@ import ( // Manager manages all the Device Plugins running on a node. type Manager interface { // Start starts device plugin registration service. - Start(activePods ActivePodsFunc) error + Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error // Devices is the map of devices that have registered themselves // against the manager. @@ -51,6 +52,10 @@ type Manager interface { // for the passed-in and returns its DeviceRunContainerOptions // for the found one. An empty struct is returned in case no cached state is found. GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions + + // GetCapacity returns the amount of available device plugin resource capacity + // and inactive device plugin resources previously registered on the node. + GetCapacity() (v1.ResourceList, []string) } // DeviceRunContainerOptions contains the combined container runtime settings to consume its allocated devices. diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 025340803b1..72194ded315 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1304,7 +1304,7 @@ func (kl *Kubelet) initializeModules() error { return fmt.Errorf("Kubelet failed to get node info: %v", err) } - if err := kl.containerManager.Start(node, kl.GetActivePods, kl.statusManager, kl.runtimeService); err != nil { + if err := kl.containerManager.Start(node, kl.GetActivePods, kl.sourcesReady, kl.statusManager, kl.runtimeService); err != nil { return fmt.Errorf("Failed to start ContainerManager %v", err) } diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index 08555c7d145..7e09c16815f 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -601,15 +601,17 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) { } } - currentCapacity := kl.containerManager.GetCapacity() - if currentCapacity != nil { - for k, v := range currentCapacity { - if v1helper.IsExtendedResourceName(k) { - glog.V(2).Infof("Update capacity for %s to %d", k, v.Value()) - node.Status.Capacity[k] = v - } + devicePluginCapacity, removedDevicePlugins := kl.containerManager.GetDevicePluginResourceCapacity() + if devicePluginCapacity != nil { + for k, v := range devicePluginCapacity { + glog.V(2).Infof("Update capacity for %s to %d", k, v.Value()) + node.Status.Capacity[k] = v } } + for _, removedResource := range removedDevicePlugins { + glog.V(2).Infof("Remove capacity for %s", removedResource) + delete(node.Status.Capacity, v1.ResourceName(removedResource)) + } } // Set Allocatable. diff --git a/test/e2e_node/gpu_device_plugin.go b/test/e2e_node/gpu_device_plugin.go index 5fe124f4fb0..4a3927218bf 100644 --- a/test/e2e_node/gpu_device_plugin.go +++ b/test/e2e_node/gpu_device_plugin.go @@ -18,7 +18,6 @@ package e2e_node import ( "os/exec" - "path/filepath" "regexp" "time" @@ -49,6 +48,7 @@ var _ = framework.KubeDescribe("NVIDIA GPU Device Plugin [Feature:GPUDevicePlugi initialConfig.FeatureGates[string(features.DevicePlugins)] = true }) + var devicePluginPod *v1.Pod BeforeEach(func() { By("Ensuring that Nvidia GPUs exists on the node") if !checkIfNvidiaGPUsExistOnNode() { @@ -56,7 +56,7 @@ var _ = framework.KubeDescribe("NVIDIA GPU Device Plugin [Feature:GPUDevicePlugi } By("Creating the Google Device Plugin pod for NVIDIA GPU in GKE") - f.PodClient().CreateSync(framework.NVIDIADevicePlugin(f.Namespace.Name)) + devicePluginPod = f.PodClient().CreateSync(framework.NVIDIADevicePlugin(f.Namespace.Name)) By("Waiting for GPUs to become available on the local node") Eventually(func() bool { @@ -84,7 +84,7 @@ var _ = framework.KubeDescribe("NVIDIA GPU Device Plugin [Feature:GPUDevicePlugi It("checks that when Kubelet restarts exclusive GPU assignation to pods is kept.", func() { By("Creating one GPU pod on a node with at least two GPUs") p1 := f.PodClient().CreateSync(makeCudaPauseImage()) - devId1 := getDeviceId(f, p1.Name, p1.Name, 1) + count1, devId1 := getDeviceId(f, p1.Name, p1.Name, 1) p1, err := f.PodClient().Get(p1.Name, metav1.GetOptions{}) framework.ExpectNoError(err) @@ -92,17 +92,36 @@ var _ = framework.KubeDescribe("NVIDIA GPU Device Plugin [Feature:GPUDevicePlugi restartKubelet(f) By("Confirming that after a kubelet and pod restart, GPU assignement is kept") - devIdRestart := getDeviceId(f, p1.Name, p1.Name, 2) - Expect(devIdRestart).To(Equal(devId1)) + count1, devIdRestart1 := getDeviceId(f, p1.Name, p1.Name, count1+1) + Expect(devIdRestart1).To(Equal(devId1)) By("Restarting Kubelet and creating another pod") restartKubelet(f) p2 := f.PodClient().CreateSync(makeCudaPauseImage()) By("Checking that pods got a different GPU") - devId2 := getDeviceId(f, p2.Name, p2.Name, 1) + count2, devId2 := getDeviceId(f, p2.Name, p2.Name, 1) Expect(devId1).To(Not(Equal(devId2))) + By("Deleting device plugin.") + f.PodClient().Delete(devicePluginPod.Name, &metav1.DeleteOptions{}) + By("Waiting for GPUs to become unavailable on the local node") + Eventually(func() bool { + return framework.NumberOfNVIDIAGPUs(getLocalNode(f)) <= 0 + }, 10*time.Minute, framework.Poll).Should(BeTrue()) + By("Checking that scheduled pods can continue to run even after we delete device plugin.") + count1, devIdRestart1 = getDeviceId(f, p1.Name, p1.Name, count1+1) + Expect(devIdRestart1).To(Equal(devId1)) + count2, devIdRestart2 := getDeviceId(f, p2.Name, p2.Name, count2+1) + Expect(devIdRestart2).To(Equal(devId2)) + By("Restarting Kubelet.") + restartKubelet(f) + By("Checking that scheduled pods can continue to run even after we delete device plugin and restart Kubelet.") + count1, devIdRestart1 = getDeviceId(f, p1.Name, p1.Name, count1+2) + Expect(devIdRestart1).To(Equal(devId1)) + count2, devIdRestart2 = getDeviceId(f, p2.Name, p2.Name, count2+2) + Expect(devIdRestart2).To(Equal(devId2)) + // Cleanup f.PodClient().DeleteSync(p1.Name, &metav1.DeleteOptions{}, framework.DefaultPodDeletionTimeout) f.PodClient().DeleteSync(p2.Name, &metav1.DeleteOptions{}, framework.DefaultPodDeletionTimeout) @@ -140,9 +159,6 @@ func newDecimalResourceList(name v1.ResourceName, quantity int64) v1.ResourceLis // TODO: Find a uniform way to deal with systemctl/initctl/service operations. #34494 func restartKubelet(f *framework.Framework) { - beforeSocks, err := filepath.Glob("/var/lib/kubelet/device-plugins/nvidiaGPU*.sock") - framework.ExpectNoError(err) - Expect(len(beforeSocks)).NotTo(BeZero()) stdout, err := exec.Command("sudo", "systemctl", "list-units", "kubelet*", "--state=running").CombinedOutput() framework.ExpectNoError(err) regex := regexp.MustCompile("(kubelet-[0-9]+)") @@ -152,19 +168,18 @@ func restartKubelet(f *framework.Framework) { framework.Logf("Get running kubelet with systemctl: %v, %v", string(stdout), kube) stdout, err = exec.Command("sudo", "systemctl", "restart", kube).CombinedOutput() framework.ExpectNoError(err, "Failed to restart kubelet with systemctl: %v, %v", err, stdout) - Eventually(func() ([]string, error) { - return filepath.Glob("/var/lib/kubelet/device-plugins/nvidiaGPU*.sock") - }, 5*time.Minute, framework.Poll).ShouldNot(ConsistOf(beforeSocks)) } -func getDeviceId(f *framework.Framework, podName string, contName string, restartCount int32) string { +func getDeviceId(f *framework.Framework, podName string, contName string, restartCount int32) (int32, string) { + var count int32 // Wait till pod has been restarted at least restartCount times. Eventually(func() bool { p, err := f.PodClient().Get(podName, metav1.GetOptions{}) if err != nil || len(p.Status.ContainerStatuses) < 1 { return false } - return p.Status.ContainerStatuses[0].RestartCount >= restartCount + count = p.Status.ContainerStatuses[0].RestartCount + return count >= restartCount }, 5*time.Minute, framework.Poll).Should(BeTrue()) logs, err := framework.GetPodLogs(f.ClientSet, f.Namespace.Name, podName, contName) if err != nil { @@ -174,7 +189,7 @@ func getDeviceId(f *framework.Framework, podName string, contName string, restar regex := regexp.MustCompile("gpu devices: (nvidia[0-9]+)") matches := regex.FindStringSubmatch(logs) if len(matches) < 2 { - return "" + return count, "" } - return matches[1] + return count, matches[1] }