Merge pull request #87759 from klueska/upstream-move-cpu-allocation-to-pod-admit

Guarantee aligned resources across containers
This commit is contained in:
Kubernetes Prow Robot 2020-03-04 20:12:37 -08:00 committed by GitHub
commit ac32644d6e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 282 additions and 157 deletions

View File

@ -25,7 +25,6 @@ import (
internalapi "k8s.io/cri-api/pkg/apis"
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
@ -111,8 +110,8 @@ type ContainerManager interface {
// due to node recreation.
ShouldResetExtendedResourceCapacity() bool
// GetTopologyPodAdmitHandler returns an instance of the TopologyManager for Pod Admission
GetTopologyPodAdmitHandler() topologymanager.Manager
// GetAllocateResourcesPodAdmitHandler returns an instance of a PodAdmitHandler responsible for allocating pod resources.
GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler
// UpdateAllocatedDevices frees any Devices that are bound to terminated pods.
UpdateAllocatedDevices()

View File

@ -672,11 +672,51 @@ func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Containe
}
func (cm *containerManagerImpl) UpdatePluginResources(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
return cm.deviceManager.Allocate(node, attrs)
return cm.deviceManager.UpdatePluginResources(node, attrs)
}
func (cm *containerManagerImpl) GetTopologyPodAdmitHandler() topologymanager.Manager {
return cm.topologyManager
func (cm *containerManagerImpl) GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler {
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) {
return cm.topologyManager
}
// TODO: we need to think about a better way to do this. This will work for
// now so long as we have only the cpuManager and deviceManager relying on
// allocations here. However, going forward it is not generalized enough to
// work as we add more and more hint providers that the TopologyManager
// needs to call Allocate() on (that may not be directly intstantiated
// inside this component).
return &resourceAllocator{cm.cpuManager, cm.deviceManager}
}
type resourceAllocator struct {
cpuManager cpumanager.Manager
deviceManager devicemanager.Manager
}
func (m *resourceAllocator) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
pod := attrs.Pod
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
err := m.deviceManager.Allocate(pod, &container)
if err != nil {
return lifecycle.PodAdmitResult{
Message: fmt.Sprintf("Allocate failed due to %v, which is unexpected", err),
Reason: "UnexpectedAdmissionError",
Admit: false,
}
}
err = m.cpuManager.Allocate(pod, &container)
if err != nil {
return lifecycle.PodAdmitResult{
Message: fmt.Sprintf("Allocate failed due to %v, which is unexpected", err),
Reason: "UnexpectedAdmissionError",
Admit: false,
}
}
}
return lifecycle.PodAdmitResult{Admit: true}
}
func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList {

View File

@ -117,8 +117,8 @@ func (cm *containerManagerStub) ShouldResetExtendedResourceCapacity() bool {
return cm.shouldResetExtendedResourceCapacity
}
func (cm *containerManagerStub) GetTopologyPodAdmitHandler() topologymanager.Manager {
return nil
func (cm *containerManagerStub) GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler {
return topologymanager.NewFakeManager()
}
func (cm *containerManagerStub) UpdateAllocatedDevices() {

View File

@ -177,7 +177,7 @@ func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool {
return false
}
func (cm *containerManagerImpl) GetTopologyPodAdmitHandler() topologymanager.Manager {
func (cm *containerManagerImpl) GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler {
return nil
}

View File

@ -55,6 +55,11 @@ type Manager interface {
// Start is called during Kubelet initialization.
Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error
// Called to trigger the allocation of CPUs to a container. This must be
// called at some point prior to the AddContainer() call for a container,
// e.g. at pod admission time.
Allocate(pod *v1.Pod, container *v1.Container) error
// AddContainer is called between container create and container start
// so that initial CPU affinity settings can be written through to the
// container runtime before the first process begins to execute.
@ -206,39 +211,33 @@ func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesRe
return nil
}
func (m *manager) AddContainer(p *v1.Pod, c *v1.Container, containerID string) error {
func (m *manager) Allocate(p *v1.Pod, c *v1.Container) error {
m.Lock()
// Proactively remove CPUs from init containers that have already run.
// They are guaranteed to have run to completion before any other
// container is run.
for _, initContainer := range p.Spec.InitContainers {
if c.Name != initContainer.Name {
err := m.policyRemoveContainerByRef(string(p.UID), initContainer.Name)
if err != nil {
klog.Warningf("[cpumanager] unable to remove init container (pod: %s, container: %s, error: %v)", string(p.UID), initContainer.Name, err)
}
}
}
defer m.Unlock()
// Call down into the policy to assign this container CPUs if required.
err := m.policyAddContainer(p, c, containerID)
err := m.policy.Allocate(m.state, p, c)
if err != nil {
klog.Errorf("[cpumanager] AddContainer error: %v", err)
m.Unlock()
klog.Errorf("[cpumanager] Allocate error: %v", err)
return err
}
// Get the CPUs just assigned to the container (or fall back to the default
// CPUSet if none were assigned).
return nil
}
func (m *manager) AddContainer(p *v1.Pod, c *v1.Container, containerID string) error {
m.Lock()
// Get the CPUs assigned to the container during Allocate()
// (or fall back to the default CPUSet if none were assigned).
cpus := m.state.GetCPUSetOrDefault(string(p.UID), c.Name)
m.Unlock()
if !cpus.IsEmpty() {
err = m.updateContainerCPUSet(containerID, cpus)
err := m.updateContainerCPUSet(containerID, cpus)
if err != nil {
klog.Errorf("[cpumanager] AddContainer error: error updating CPUSet for container (pod: %s, container: %s, container id: %s, err: %v)", p.Name, c.Name, containerID, err)
m.Lock()
err := m.policyRemoveContainerByID(containerID)
err := m.policyRemoveContainerByRef(string(p.UID), c.Name)
if err != nil {
klog.Errorf("[cpumanager] AddContainer rollback state error: %v", err)
}
@ -246,6 +245,7 @@ func (m *manager) AddContainer(p *v1.Pod, c *v1.Container, containerID string) e
}
return err
}
klog.V(5).Infof("[cpumanager] update container resources is skipped due to cpu set is empty")
return nil
}
@ -263,14 +263,6 @@ func (m *manager) RemoveContainer(containerID string) error {
return nil
}
func (m *manager) policyAddContainer(p *v1.Pod, c *v1.Container, containerID string) error {
err := m.policy.AddContainer(m.state, p, c)
if err == nil {
m.containerMap.Add(string(p.UID), c.Name, containerID)
}
return err
}
func (m *manager) policyRemoveContainerByID(containerID string) error {
podUID, containerName, err := m.containerMap.GetContainerRef(containerID)
if err != nil {

View File

@ -104,7 +104,7 @@ func (p *mockPolicy) Start(s state.State) error {
return p.err
}
func (p *mockPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container) error {
func (p *mockPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error {
return p.err
}
@ -223,18 +223,20 @@ func TestCPUManagerAdd(t *testing.T) {
cpuset.NewCPUSet(),
topologymanager.NewFakeManager())
testCases := []struct {
description string
updateErr error
policy Policy
expCPUSet cpuset.CPUSet
expErr error
description string
updateErr error
policy Policy
expCPUSet cpuset.CPUSet
expAllocateErr error
expAddContainerErr error
}{
{
description: "cpu manager add - no error",
updateErr: nil,
policy: testPolicy,
expCPUSet: cpuset.NewCPUSet(3, 4),
expErr: nil,
description: "cpu manager add - no error",
updateErr: nil,
policy: testPolicy,
expCPUSet: cpuset.NewCPUSet(3, 4),
expAllocateErr: nil,
expAddContainerErr: nil,
},
{
description: "cpu manager add - policy add container error",
@ -242,15 +244,17 @@ func TestCPUManagerAdd(t *testing.T) {
policy: &mockPolicy{
err: fmt.Errorf("fake reg error"),
},
expCPUSet: cpuset.NewCPUSet(1, 2, 3, 4),
expErr: fmt.Errorf("fake reg error"),
expCPUSet: cpuset.NewCPUSet(1, 2, 3, 4),
expAllocateErr: fmt.Errorf("fake reg error"),
expAddContainerErr: nil,
},
{
description: "cpu manager add - container update error",
updateErr: fmt.Errorf("fake update error"),
policy: testPolicy,
expCPUSet: cpuset.NewCPUSet(1, 2, 3, 4),
expErr: fmt.Errorf("fake update error"),
description: "cpu manager add - container update error",
updateErr: fmt.Errorf("fake update error"),
policy: testPolicy,
expCPUSet: cpuset.NewCPUSet(1, 2, 3, 4),
expAllocateErr: nil,
expAddContainerErr: fmt.Errorf("fake update error"),
},
}
@ -271,10 +275,16 @@ func TestCPUManagerAdd(t *testing.T) {
pod := makePod("fakePod", "fakeContainer", "2", "2")
container := &pod.Spec.Containers[0]
err := mgr.AddContainer(pod, container, "fakeID")
if !reflect.DeepEqual(err, testCase.expErr) {
err := mgr.Allocate(pod, container)
if !reflect.DeepEqual(err, testCase.expAllocateErr) {
t.Errorf("CPU Manager Allocate() error (%v). expected error: %v but got: %v",
testCase.description, testCase.expAllocateErr, err)
}
err = mgr.AddContainer(pod, container, "fakeID")
if !reflect.DeepEqual(err, testCase.expAddContainerErr) {
t.Errorf("CPU Manager AddContainer() error (%v). expected error: %v but got: %v",
testCase.description, testCase.expErr, err)
testCase.description, testCase.expAddContainerErr, err)
}
if !testCase.expCPUSet.Equals(mgr.state.GetDefaultCPUSet()) {
t.Errorf("CPU Manager AddContainer() error (%v). expected cpuset: %v but got: %v",
@ -494,7 +504,12 @@ func TestCPUManagerAddWithInitContainers(t *testing.T) {
testCase.expCSets...)
for i := range containers {
err := mgr.AddContainer(testCase.pod, &containers[i], containerIDs[i])
err := mgr.Allocate(testCase.pod, &containers[i])
if err != nil {
t.Errorf("StaticPolicy Allocate() error (%v). unexpected error for container id: %v: %v",
testCase.description, containerIDs[i], err)
}
err = mgr.AddContainer(testCase.pod, &containers[i], containerIDs[i])
if err != nil {
t.Errorf("StaticPolicy AddContainer() error (%v). unexpected error for container id: %v: %v",
testCase.description, containerIDs[i], err)
@ -970,25 +985,28 @@ func TestCPUManagerAddWithResvList(t *testing.T) {
cpuset.NewCPUSet(0),
topologymanager.NewFakeManager())
testCases := []struct {
description string
updateErr error
policy Policy
expCPUSet cpuset.CPUSet
expErr error
description string
updateErr error
policy Policy
expCPUSet cpuset.CPUSet
expAllocateErr error
expAddContainerErr error
}{
{
description: "cpu manager add - no error",
updateErr: nil,
policy: testPolicy,
expCPUSet: cpuset.NewCPUSet(0, 3),
expErr: nil,
description: "cpu manager add - no error",
updateErr: nil,
policy: testPolicy,
expCPUSet: cpuset.NewCPUSet(0, 3),
expAllocateErr: nil,
expAddContainerErr: nil,
},
{
description: "cpu manager add - container update error",
updateErr: fmt.Errorf("fake update error"),
policy: testPolicy,
expCPUSet: cpuset.NewCPUSet(0, 1, 2, 3),
expErr: fmt.Errorf("fake update error"),
description: "cpu manager add - container update error",
updateErr: fmt.Errorf("fake update error"),
policy: testPolicy,
expCPUSet: cpuset.NewCPUSet(0, 1, 2, 3),
expAllocateErr: nil,
expAddContainerErr: fmt.Errorf("fake update error"),
},
}
@ -1009,10 +1027,16 @@ func TestCPUManagerAddWithResvList(t *testing.T) {
pod := makePod("fakePod", "fakeContainer", "2", "2")
container := &pod.Spec.Containers[0]
err := mgr.AddContainer(pod, container, "fakeID")
if !reflect.DeepEqual(err, testCase.expErr) {
err := mgr.Allocate(pod, container)
if !reflect.DeepEqual(err, testCase.expAllocateErr) {
t.Errorf("CPU Manager Allocate() error (%v). expected error: %v but got: %v",
testCase.description, testCase.expAllocateErr, err)
}
err = mgr.AddContainer(pod, container, "fakeID")
if !reflect.DeepEqual(err, testCase.expAddContainerErr) {
t.Errorf("CPU Manager AddContainer() error (%v). expected error: %v but got: %v",
testCase.description, testCase.expErr, err)
testCase.description, testCase.expAddContainerErr, err)
}
if !testCase.expCPUSet.Equals(mgr.state.GetDefaultCPUSet()) {
t.Errorf("CPU Manager AddContainer() error (%v). expected cpuset: %v but got: %v",

View File

@ -40,6 +40,11 @@ func (m *fakeManager) Policy() Policy {
return NewNonePolicy()
}
func (m *fakeManager) Allocate(pod *v1.Pod, container *v1.Container) error {
klog.Infof("[fake cpumanager] Allocate (pod: %s, container: %s", pod.Name, container.Name)
return nil
}
func (m *fakeManager) AddContainer(pod *v1.Pod, container *v1.Container, containerID string) error {
klog.Infof("[fake cpumanager] AddContainer (pod: %s, container: %s, container id: %s)", pod.Name, container.Name, containerID)
return nil

View File

@ -26,8 +26,8 @@ import (
type Policy interface {
Name() string
Start(s state.State) error
// AddContainer call is idempotent
AddContainer(s state.State, pod *v1.Pod, container *v1.Container) error
// Allocate call is idempotent
Allocate(s state.State, pod *v1.Pod, container *v1.Container) error
// RemoveContainer call is idempotent
RemoveContainer(s state.State, podUID string, containerName string) error
// GetTopologyHints implements the topologymanager.HintProvider Interface

View File

@ -44,7 +44,7 @@ func (p *nonePolicy) Start(s state.State) error {
return nil
}
func (p *nonePolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container) error {
func (p *nonePolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error {
return nil
}

View File

@ -33,7 +33,7 @@ func TestNonePolicyName(t *testing.T) {
}
}
func TestNonePolicyAdd(t *testing.T) {
func TestNonePolicyAllocate(t *testing.T) {
policy := &nonePolicy{}
st := &mockState{
@ -44,9 +44,9 @@ func TestNonePolicyAdd(t *testing.T) {
testPod := makePod("fakePod", "fakeContainer", "1000m", "1000m")
container := &testPod.Spec.Containers[0]
err := policy.AddContainer(st, testPod, container)
err := policy.Allocate(st, testPod, container)
if err != nil {
t.Errorf("NonePolicy AddContainer() error. expected no error but got: %v", err)
t.Errorf("NonePolicy Allocate() error. expected no error but got: %v", err)
}
}

View File

@ -188,9 +188,9 @@ func (p *staticPolicy) assignableCPUs(s state.State) cpuset.CPUSet {
return s.GetDefaultCPUSet().Difference(p.reserved)
}
func (p *staticPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container) error {
func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error {
if numCPUs := p.guaranteedCPUs(pod, container); numCPUs != 0 {
klog.Infof("[cpumanager] static policy: AddContainer (pod: %s, container: %s)", pod.Name, container.Name)
klog.Infof("[cpumanager] static policy: Allocate (pod: %s, container: %s)", pod.Name, container.Name)
// container belongs in an exclusively allocated pool
if _, ok := s.GetCPUSet(string(pod.UID), container.Name); ok {
@ -209,6 +209,17 @@ func (p *staticPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Co
return err
}
s.SetCPUSet(string(pod.UID), container.Name, cpuset)
// Check if the container that has just been allocated resources is an init container.
// If so, release its CPUs back into the shared pool so they can be reallocated.
for _, initContainer := range pod.Spec.InitContainers {
if container.Name == initContainer.Name {
if toRelease, ok := s.GetCPUSet(string(pod.UID), container.Name); ok {
// Mutate the shared pool, adding released cpus.
s.SetDefaultCPUSet(s.GetDefaultCPUSet().Union(toRelease))
}
}
}
}
// container belongs in the shared pool (nothing to do; use default cpuset)
return nil

View File

@ -444,26 +444,26 @@ func TestStaticPolicyAdd(t *testing.T) {
}
container := &testCase.pod.Spec.Containers[0]
err := policy.AddContainer(st, testCase.pod, container)
err := policy.Allocate(st, testCase.pod, container)
if !reflect.DeepEqual(err, testCase.expErr) {
t.Errorf("StaticPolicy AddContainer() error (%v). expected add error: %v but got: %v",
t.Errorf("StaticPolicy Allocate() error (%v). expected add error: %v but got: %v",
testCase.description, testCase.expErr, err)
}
if testCase.expCPUAlloc {
cset, found := st.assignments[string(testCase.pod.UID)][container.Name]
if !found {
t.Errorf("StaticPolicy AddContainer() error (%v). expected container %v to be present in assignments %v",
t.Errorf("StaticPolicy Allocate() error (%v). expected container %v to be present in assignments %v",
testCase.description, container.Name, st.assignments)
}
if !reflect.DeepEqual(cset, testCase.expCSet) {
t.Errorf("StaticPolicy AddContainer() error (%v). expected cpuset %v but got %v",
t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %v but got %v",
testCase.description, testCase.expCSet, cset)
}
if !cset.Intersection(st.defaultCPUSet).IsEmpty() {
t.Errorf("StaticPolicy AddContainer() error (%v). expected cpuset %v to be disoint from the shared cpuset %v",
t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %v to be disoint from the shared cpuset %v",
testCase.description, cset, st.defaultCPUSet)
}
}
@ -471,7 +471,7 @@ func TestStaticPolicyAdd(t *testing.T) {
if !testCase.expCPUAlloc {
_, found := st.assignments[string(testCase.pod.UID)][container.Name]
if found {
t.Errorf("StaticPolicy AddContainer() error (%v). Did not expect container %v to be present in assignments %v",
t.Errorf("StaticPolicy Allocate() error (%v). Did not expect container %v to be present in assignments %v",
testCase.description, container.Name, st.assignments)
}
}
@ -786,26 +786,26 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
}
container := &testCase.pod.Spec.Containers[0]
err := policy.AddContainer(st, testCase.pod, container)
err := policy.Allocate(st, testCase.pod, container)
if !reflect.DeepEqual(err, testCase.expErr) {
t.Errorf("StaticPolicy AddContainer() error (%v). expected add error: %v but got: %v",
t.Errorf("StaticPolicy Allocate() error (%v). expected add error: %v but got: %v",
testCase.description, testCase.expErr, err)
}
if testCase.expCPUAlloc {
cset, found := st.assignments[string(testCase.pod.UID)][container.Name]
if !found {
t.Errorf("StaticPolicy AddContainer() error (%v). expected container %v to be present in assignments %v",
t.Errorf("StaticPolicy Allocate() error (%v). expected container %v to be present in assignments %v",
testCase.description, container.Name, st.assignments)
}
if !reflect.DeepEqual(cset, testCase.expCSet) {
t.Errorf("StaticPolicy AddContainer() error (%v). expected cpuset %v but got %v",
t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %v but got %v",
testCase.description, testCase.expCSet, cset)
}
if !cset.Intersection(st.defaultCPUSet).IsEmpty() {
t.Errorf("StaticPolicy AddContainer() error (%v). expected cpuset %v to be disoint from the shared cpuset %v",
t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %v to be disoint from the shared cpuset %v",
testCase.description, cset, st.defaultCPUSet)
}
}
@ -813,7 +813,7 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
if !testCase.expCPUAlloc {
_, found := st.assignments[string(testCase.pod.UID)][container.Name]
if found {
t.Errorf("StaticPolicy AddContainer() error (%v). Did not expect container %v to be present in assignments %v",
t.Errorf("StaticPolicy Allocate() error (%v). Did not expect container %v to be present in assignments %v",
testCase.description, container.Name, st.assignments)
}
}

View File

@ -105,6 +105,10 @@ type ManagerImpl struct {
// Store of Topology Affinties that the Device Manager can query.
topologyAffinityStore topologymanager.Store
// devicesToReuse contains devices that can be reused as they have been allocated to
// init containers.
devicesToReuse PodReusableDevices
}
type endpointInfo struct {
@ -114,6 +118,9 @@ type endpointInfo struct {
type sourcesReadyStub struct{}
// PodReusableDevices is a map by pod name of devices to reuse.
type PodReusableDevices map[string]map[string]sets.String
func (s *sourcesReadyStub) AddSource(source string) {}
func (s *sourcesReadyStub) AllReady() bool { return true }
@ -147,6 +154,7 @@ func newManagerImpl(socketPath string, numaNodeInfo cputopology.NUMANodeInfo, to
podDevices: make(podDevices),
numaNodes: numaNodes,
topologyAffinityStore: topologyAffinityStore,
devicesToReuse: make(PodReusableDevices),
}
manager.callback = manager.genericDeviceUpdateCallback
@ -350,32 +358,41 @@ func (m *ManagerImpl) isVersionCompatibleWithPlugin(versions []string) bool {
return false
}
func (m *ManagerImpl) allocatePodResources(pod *v1.Pod) error {
devicesToReuse := make(map[string]sets.String)
for _, container := range pod.Spec.InitContainers {
if err := m.allocateContainerResources(pod, &container, devicesToReuse); err != nil {
return err
}
m.podDevices.addContainerAllocatedResources(string(pod.UID), container.Name, devicesToReuse)
}
for _, container := range pod.Spec.Containers {
if err := m.allocateContainerResources(pod, &container, devicesToReuse); err != nil {
return err
}
m.podDevices.removeContainerAllocatedResources(string(pod.UID), container.Name, devicesToReuse)
}
return nil
}
// Allocate is the call that you can use to allocate a set of devices
// from the registered device plugins.
func (m *ManagerImpl) Allocate(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
pod := attrs.Pod
err := m.allocatePodResources(pod)
if err != nil {
klog.Errorf("Failed to allocate device plugin resource for pod %s: %v", string(pod.UID), err)
func (m *ManagerImpl) Allocate(pod *v1.Pod, container *v1.Container) error {
if _, ok := m.devicesToReuse[string(pod.UID)]; !ok {
m.devicesToReuse[string(pod.UID)] = make(map[string]sets.String)
}
// If pod entries to m.devicesToReuse other than the current pod exist, delete them.
for podUID := range m.devicesToReuse {
if podUID != string(pod.UID) {
delete(m.devicesToReuse, podUID)
}
}
// Allocate resources for init containers first as we know the caller always loops
// through init containers before looping through app containers. Should the caller
// ever change those semantics, this logic will need to be amended.
for _, initContainer := range pod.Spec.InitContainers {
if container.Name == initContainer.Name {
if err := m.allocateContainerResources(pod, container, m.devicesToReuse[string(pod.UID)]); err != nil {
return err
}
m.podDevices.addContainerAllocatedResources(string(pod.UID), container.Name, m.devicesToReuse[string(pod.UID)])
return nil
}
}
if err := m.allocateContainerResources(pod, container, m.devicesToReuse[string(pod.UID)]); err != nil {
return err
}
m.podDevices.removeContainerAllocatedResources(string(pod.UID), container.Name, m.devicesToReuse[string(pod.UID)])
return nil
}
// UpdatePluginResources updates node resources based on devices already allocated to pods.
func (m *ManagerImpl) UpdatePluginResources(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
pod := attrs.Pod
m.mutex.Lock()
defer m.mutex.Unlock()
@ -860,8 +877,8 @@ func (m *ManagerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Co
}
}
if needsReAllocate {
klog.V(2).Infof("needs re-allocate device plugin resources for pod %s", podUID)
if err := m.allocatePodResources(pod); err != nil {
klog.V(2).Infof("needs re-allocate device plugin resources for pod %s, container %s", podUID, container.Name)
if err := m.Allocate(pod, container); err != nil {
return nil, err
}
}

View File

@ -45,7 +45,12 @@ func (h *ManagerStub) Stop() error {
}
// Allocate simply returns nil.
func (h *ManagerStub) Allocate(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
func (h *ManagerStub) Allocate(pod *v1.Pod, container *v1.Container) error {
return nil
}
// UpdatePluginResources simply returns nil.
func (h *ManagerStub) UpdatePluginResources(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
return nil
}

View File

@ -30,6 +30,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
@ -604,6 +605,7 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso
allocatedDevices: make(map[string]sets.String),
endpoints: make(map[string]endpointInfo),
podDevices: make(podDevices),
devicesToReuse: make(PodReusableDevices),
topologyAffinityStore: topologymanager.NewFakeManager(),
activePods: activePods,
sourcesReady: &sourcesReadyStub{},
@ -648,17 +650,6 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso
return testManager, nil
}
func getTestNodeInfo(allocatable v1.ResourceList) *schedulernodeinfo.NodeInfo {
cachedNode := &v1.Node{
Status: v1.NodeStatus{
Allocatable: allocatable,
},
}
nodeInfo := &schedulernodeinfo.NodeInfo{}
nodeInfo.SetNode(cachedNode)
return nodeInfo
}
type TestResource struct {
resourceName string
resourceQuantity resource.Quantity
@ -686,7 +677,6 @@ func TestPodContainerDeviceAllocation(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "checkpoint")
as.Nil(err)
defer os.RemoveAll(tmpDir)
nodeInfo := getTestNodeInfo(v1.ResourceList{})
testManager, err := getTestManager(tmpDir, podsStub.getActivePods, testResources)
as.Nil(err)
@ -738,7 +728,7 @@ func TestPodContainerDeviceAllocation(t *testing.T) {
pod := testCase.testPod
activePods = append(activePods, pod)
podsStub.updateActivePods(activePods)
err := testManager.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: pod})
err := testManager.Allocate(pod, &pod.Spec.Containers[0])
if !reflect.DeepEqual(err, testCase.expErr) {
t.Errorf("DevicePluginManager error (%v). expected error: %v but got: %v",
testCase.description, testCase.expErr, err)
@ -780,7 +770,6 @@ func TestInitContainerDeviceAllocation(t *testing.T) {
podsStub := activePodsStub{
activePods: []*v1.Pod{},
}
nodeInfo := getTestNodeInfo(v1.ResourceList{})
tmpDir, err := ioutil.TempDir("", "checkpoint")
as.Nil(err)
defer os.RemoveAll(tmpDir)
@ -834,7 +823,12 @@ func TestInitContainerDeviceAllocation(t *testing.T) {
},
}
podsStub.updateActivePods([]*v1.Pod{podWithPluginResourcesInInitContainers})
err = testManager.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: podWithPluginResourcesInInitContainers})
for _, container := range podWithPluginResourcesInInitContainers.Spec.InitContainers {
err = testManager.Allocate(podWithPluginResourcesInInitContainers, &container)
}
for _, container := range podWithPluginResourcesInInitContainers.Spec.Containers {
err = testManager.Allocate(podWithPluginResourcesInInitContainers, &container)
}
as.Nil(err)
podUID := string(podWithPluginResourcesInInitContainers.UID)
initCont1 := podWithPluginResourcesInInitContainers.Spec.InitContainers[0].Name
@ -855,7 +849,10 @@ func TestInitContainerDeviceAllocation(t *testing.T) {
as.Equal(0, normalCont1Devices.Intersection(normalCont2Devices).Len())
}
func TestSanitizeNodeAllocatable(t *testing.T) {
func TestUpdatePluginResources(t *testing.T) {
pod := &v1.Pod{}
pod.UID = types.UID("testPod")
resourceName1 := "domain1.com/resource1"
devID1 := "dev1"
@ -876,6 +873,8 @@ func TestSanitizeNodeAllocatable(t *testing.T) {
podDevices: make(podDevices),
checkpointManager: ckm,
}
testManager.podDevices[string(pod.UID)] = make(containerDevices)
// require one of resource1 and one of resource2
testManager.allocatedDevices[resourceName1] = sets.NewString()
testManager.allocatedDevices[resourceName1].Insert(devID1)
@ -893,7 +892,7 @@ func TestSanitizeNodeAllocatable(t *testing.T) {
nodeInfo := &schedulernodeinfo.NodeInfo{}
nodeInfo.SetNode(cachedNode)
testManager.sanitizeNodeAllocatable(nodeInfo)
testManager.UpdatePluginResources(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: pod})
allocatableScalarResources := nodeInfo.AllocatableResource().ScalarResources
// allocatable in nodeInfo is less than needed, should update
@ -918,7 +917,6 @@ func TestDevicePreStartContainer(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "checkpoint")
as.Nil(err)
defer os.RemoveAll(tmpDir)
nodeInfo := getTestNodeInfo(v1.ResourceList{})
testManager, err := getTestManager(tmpDir, podsStub.getActivePods, []TestResource{res1})
as.Nil(err)
@ -936,7 +934,7 @@ func TestDevicePreStartContainer(t *testing.T) {
activePods := []*v1.Pod{}
activePods = append(activePods, pod)
podsStub.updateActivePods(activePods)
err = testManager.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: pod})
err = testManager.Allocate(pod, &pod.Spec.Containers[0])
as.Nil(err)
runContainerOpts, err := testManager.GetDeviceRunContainerOptions(pod, &pod.Spec.Containers[0])
as.Nil(err)

View File

@ -34,15 +34,17 @@ type Manager interface {
// Start starts device plugin registration service.
Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error
// Allocate configures and assigns devices to pods. The pods are provided
// through the pod admission attributes in the attrs argument. From the
// requested device resources, Allocate will communicate with the owning
// device plugin to allow setup procedures to take place, and for the
// device plugin to provide runtime settings to use the device (environment
// variables, mount points and device files). The node object is provided
// for the device manager to update the node capacity to reflect the
// currently available devices.
Allocate(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error
// Allocate configures and assigns devices to a container in a pod. From
// the requested device resources, Allocate will communicate with the
// owning device plugin to allow setup procedures to take place, and for
// the device plugin to provide runtime settings to use the device
// (environment variables, mount points and device files).
Allocate(pod *v1.Pod, container *v1.Container) error
// UpdatePluginResources updates node resources based on devices already
// allocated to pods. The node object is provided for the device manager to
// update the node capacity to reflect the currently available devices.
UpdatePluginResources(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error
// Stop stops the manager.
Stop() error

View File

@ -77,6 +77,10 @@ type HintProvider interface {
// a consensus "best" hint. The hint providers may subsequently query the
// topology manager to influence actual resource assignment.
GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]TopologyHint
// Allocate triggers resource allocation to occur on the HintProvider after
// all hints have been gathered and the aggregated Hint is available via a
// call to Store.GetAffinity().
Allocate(pod *v1.Pod, container *v1.Container) error
}
//Store interface is to allow Hint Providers to retrieve pod affinity
@ -176,6 +180,16 @@ func (m *manager) accumulateProvidersHints(pod *v1.Pod, container *v1.Container)
return providersHints
}
func (m *manager) allocateAlignedResources(pod *v1.Pod, container *v1.Container) error {
for _, provider := range m.hintProviders {
err := provider.Allocate(pod, container)
if err != nil {
return err
}
}
return nil
}
// Collect Hints from hint providers and pass to policy to retrieve the best one.
func (m *manager) calculateAffinity(pod *v1.Pod, container *v1.Container) (TopologyHint, bool) {
providersHints := m.accumulateProvidersHints(pod, container)
@ -216,7 +230,6 @@ func (m *manager) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitR
klog.Infof("[topologymanager] Topology Admit Handler")
pod := attrs.Pod
hints := make(map[string]TopologyHint)
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
result, admit := m.calculateAffinity(pod, &container)
@ -227,11 +240,22 @@ func (m *manager) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitR
Admit: false,
}
}
hints[container.Name] = result
}
m.podTopologyHints[string(pod.UID)] = hints
klog.Infof("[topologymanager] Topology Affinity for Pod: %v are %v", pod.UID, m.podTopologyHints[string(pod.UID)])
klog.Infof("[topologymanager] Topology Affinity for (pod: %v container: %v): %v", pod.UID, container.Name, result)
if m.podTopologyHints[string(pod.UID)] == nil {
m.podTopologyHints[string(pod.UID)] = make(map[string]TopologyHint)
}
m.podTopologyHints[string(pod.UID)][container.Name] = result
err := m.allocateAlignedResources(pod, &container)
if err != nil {
return lifecycle.PodAdmitResult{
Message: fmt.Sprintf("Allocate failed due to %v, which is unexpected", err),
Reason: "UnexpectedAdmissionError",
Admit: false,
}
}
}
return lifecycle.PodAdmitResult{Admit: true}
}

View File

@ -75,12 +75,20 @@ func TestNewManager(t *testing.T) {
type mockHintProvider struct {
th map[string][]TopologyHint
//TODO: Add this field and add some tests to make sure things error out
//appropriately on allocation errors.
//allocateError error
}
func (m *mockHintProvider) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]TopologyHint {
return m.th
}
func (m *mockHintProvider) Allocate(pod *v1.Pod, container *v1.Container) error {
//return allocateError
return nil
}
func TestGetAffinity(t *testing.T) {
tcases := []struct {
name string

View File

@ -867,9 +867,9 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
}
klet.AddPodSyncLoopHandler(activeDeadlineHandler)
klet.AddPodSyncHandler(activeDeadlineHandler)
if utilfeature.DefaultFeatureGate.Enabled(features.TopologyManager) {
klet.admitHandlers.AddPodAdmitHandler(klet.containerManager.GetTopologyPodAdmitHandler())
}
klet.admitHandlers.AddPodAdmitHandler(klet.containerManager.GetAllocateResourcesPodAdmitHandler())
criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.GetActivePods, killPodNow(klet.podWorkers, kubeDeps.Recorder), kubeDeps.Recorder)
klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources))
// apply functional Option's