Merge pull request #79409 from takmatsu/add-phase

Modify Kubelet Pod Resources API to get only active pods
This commit is contained in:
Kubernetes Prow Robot 2020-02-08 16:09:52 -08:00 committed by GitHub
commit d09f8b9d54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 36 additions and 6 deletions

View File

@ -26,6 +26,7 @@ import (
// DevicesProvider knows how to provide the devices used by the given container
type DevicesProvider interface {
GetDevices(podUID, containerName string) []*v1alpha1.ContainerDevices
UpdateAllocatedDevices()
}
// PodsProvider knows how to provide the pods admitted by the node
@ -52,6 +53,7 @@ func NewPodResourcesServer(podsProvider PodsProvider, devicesProvider DevicesPro
func (p *podResourcesServer) List(ctx context.Context, req *v1alpha1.ListPodResourcesRequest) (*v1alpha1.ListPodResourcesResponse, error) {
pods := p.podsProvider.GetPods()
podResources := make([]*v1alpha1.PodResources, len(pods))
p.devicesProvider.UpdateAllocatedDevices()
for i, pod := range pods {
pRes := v1alpha1.PodResources{

View File

@ -42,6 +42,10 @@ func (m *mockProvider) GetDevices(podUID, containerName string) []*v1alpha1.Cont
return args.Get(0).([]*v1alpha1.ContainerDevices)
}
func (m *mockProvider) UpdateAllocatedDevices() {
m.Called()
}
func TestListPodResources(t *testing.T) {
podName := "pod-name"
podNamespace := "pod-namespace"
@ -140,6 +144,7 @@ func TestListPodResources(t *testing.T) {
m := new(mockProvider)
m.On("GetPods").Return(tc.pods)
m.On("GetDevices", string(podUID), containerName).Return(tc.devices)
m.On("UpdateAllocatedDevices").Return()
server := NewPodResourcesServer(m, m)
resp, err := server.List(context.TODO(), &v1alpha1.ListPodResourcesRequest{})
if err != nil {

View File

@ -113,6 +113,9 @@ type ContainerManager interface {
// GetTopologyPodAdmitHandler returns an instance of the TopologyManager for Pod Admission
GetTopologyPodAdmitHandler() topologymanager.Manager
// UpdateAllocatedDevices frees any Devices that are bound to terminated pods.
UpdateAllocatedDevices()
}
type NodeConfig struct {

View File

@ -946,3 +946,7 @@ func (cm *containerManagerImpl) GetDevices(podUID, containerName string) []*podr
func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool {
return cm.deviceManager.ShouldResetExtendedResourceCapacity()
}
func (cm *containerManagerImpl) UpdateAllocatedDevices() {
cm.deviceManager.UpdateAllocatedDevices()
}

View File

@ -121,6 +121,10 @@ func (cm *containerManagerStub) GetTopologyPodAdmitHandler() topologymanager.Man
return nil
}
func (cm *containerManagerStub) UpdateAllocatedDevices() {
return
}
func NewStubContainerManager() ContainerManager {
return &containerManagerStub{shouldResetExtendedResourceCapacity: false}
}

View File

@ -181,3 +181,7 @@ func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool {
func (cm *containerManagerImpl) GetTopologyPodAdmitHandler() topologymanager.Manager {
return nil
}
func (cm *containerManagerImpl) UpdateAllocatedDevices() {
return
}

View File

@ -597,9 +597,9 @@ func (m *ManagerImpl) readCheckpoint() error {
return nil
}
// updateAllocatedDevices gets a list of active pods and then frees any Devices that are bound to
// terminated pods. Returns error on failure.
func (m *ManagerImpl) updateAllocatedDevices(activePods []*v1.Pod) {
// UpdateAllocatedDevices frees any Devices that are bound to terminated pods.
func (m *ManagerImpl) UpdateAllocatedDevices() {
activePods := m.activePods()
if !m.sourcesReady.AllReady() {
return
}
@ -773,7 +773,7 @@ func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Cont
// Updates allocatedDevices to garbage collect any stranded resources
// before doing the device plugin allocation.
if !allocatedDevicesUpdated {
m.updateAllocatedDevices(m.activePods())
m.UpdateAllocatedDevices()
allocatedDevicesUpdated = true
}
allocDevices, err := m.devicesToAllocate(podUID, contName, resource, needed, devicesToReuse[resource])
@ -788,7 +788,7 @@ func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Cont
// Manager.Allocate involves RPC calls to device plugin, which
// could be heavy-weight. Therefore we want to perform this operation outside
// mutex lock. Note if Allocate call fails, we may leave container resources
// partially allocated for the failed container. We rely on updateAllocatedDevices()
// partially allocated for the failed container. We rely on UpdateAllocatedDevices()
// to garbage collect these resources later. Another side effect is that if
// we have X resource A and Y resource B in total, and two containers, container1
// and container2 both require X resource A and Y resource B. Both allocation

View File

@ -78,3 +78,8 @@ func (h *ManagerStub) GetDevices(_, _ string) []*podresourcesapi.ContainerDevice
func (h *ManagerStub) ShouldResetExtendedResourceCapacity() bool {
return false
}
// UpdateAllocatedDevices returns nothing
func (h *ManagerStub) UpdateAllocatedDevices() {
return
}

View File

@ -29,7 +29,7 @@ import (
// container are created.
func (m *ManagerImpl) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
// Garbage collect any stranded device resources before providing TopologyHints
m.updateAllocatedDevices(m.activePods())
m.UpdateAllocatedDevices()
// Loop through all device resources and generate TopologyHints for them..
deviceHints := make(map[string][]topologymanager.TopologyHint)

View File

@ -68,6 +68,9 @@ type Manager interface {
// TopologyManager HintProvider provider indicates the Device Manager implements the Topology Manager Interface
// and is consulted to make Topology aware resource alignments
GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint
// UpdateAllocatedDevices frees any Devices that are bound to terminated pods.
UpdateAllocatedDevices()
}
// DeviceRunContainerOptions contains the combined container runtime settings to consume its allocated devices.