diff --git a/pkg/kubelet/cm/container_manager.go b/pkg/kubelet/cm/container_manager.go index caa3de666c7..8e60fab396d 100644 --- a/pkg/kubelet/cm/container_manager.go +++ b/pkg/kubelet/cm/container_manager.go @@ -25,7 +25,6 @@ import ( internalapi "k8s.io/cri-api/pkg/apis" podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" - "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" @@ -111,8 +110,8 @@ type ContainerManager interface { // due to node recreation. ShouldResetExtendedResourceCapacity() bool - // GetTopologyPodAdmitHandler returns an instance of the TopologyManager for Pod Admission - GetTopologyPodAdmitHandler() topologymanager.Manager + // GetAllocateResourcesPodAdmitHandler returns an instance of a PodAdmitHandler responsible for allocating pod resources. + GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler // UpdateAllocatedDevices frees any Devices that are bound to terminated pods. UpdateAllocatedDevices() diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index d5df6f18c50..98c3647193f 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -672,11 +672,51 @@ func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Containe } func (cm *containerManagerImpl) UpdatePluginResources(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { - return cm.deviceManager.Allocate(node, attrs) + return cm.deviceManager.UpdatePluginResources(node, attrs) } -func (cm *containerManagerImpl) GetTopologyPodAdmitHandler() topologymanager.Manager { - return cm.topologyManager +func (cm *containerManagerImpl) GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler { + if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) { + return cm.topologyManager + } + // TODO: we need to think about a better way to do this. This will work for + // now so long as we have only the cpuManager and deviceManager relying on + // allocations here. However, going forward it is not generalized enough to + // work as we add more and more hint providers that the TopologyManager + // needs to call Allocate() on (that may not be directly intstantiated + // inside this component). + return &resourceAllocator{cm.cpuManager, cm.deviceManager} +} + +type resourceAllocator struct { + cpuManager cpumanager.Manager + deviceManager devicemanager.Manager +} + +func (m *resourceAllocator) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult { + pod := attrs.Pod + + for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) { + err := m.deviceManager.Allocate(pod, &container) + if err != nil { + return lifecycle.PodAdmitResult{ + Message: fmt.Sprintf("Allocate failed due to %v, which is unexpected", err), + Reason: "UnexpectedAdmissionError", + Admit: false, + } + } + + err = m.cpuManager.Allocate(pod, &container) + if err != nil { + return lifecycle.PodAdmitResult{ + Message: fmt.Sprintf("Allocate failed due to %v, which is unexpected", err), + Reason: "UnexpectedAdmissionError", + Admit: false, + } + } + } + + return lifecycle.PodAdmitResult{Admit: true} } func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList { diff --git a/pkg/kubelet/cm/container_manager_stub.go b/pkg/kubelet/cm/container_manager_stub.go index c21e8cce0dc..3ed3d264052 100644 --- a/pkg/kubelet/cm/container_manager_stub.go +++ b/pkg/kubelet/cm/container_manager_stub.go @@ -117,8 +117,8 @@ func (cm *containerManagerStub) ShouldResetExtendedResourceCapacity() bool { return cm.shouldResetExtendedResourceCapacity } -func (cm *containerManagerStub) GetTopologyPodAdmitHandler() topologymanager.Manager { - return nil +func (cm *containerManagerStub) GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler { + return topologymanager.NewFakeManager() } func (cm *containerManagerStub) UpdateAllocatedDevices() { diff --git a/pkg/kubelet/cm/container_manager_windows.go b/pkg/kubelet/cm/container_manager_windows.go index 9cbcc67c188..5562951fa0f 100644 --- a/pkg/kubelet/cm/container_manager_windows.go +++ b/pkg/kubelet/cm/container_manager_windows.go @@ -177,7 +177,7 @@ func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool { return false } -func (cm *containerManagerImpl) GetTopologyPodAdmitHandler() topologymanager.Manager { +func (cm *containerManagerImpl) GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler { return nil } diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index 3a86292133a..616a620f8ce 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -55,6 +55,11 @@ type Manager interface { // Start is called during Kubelet initialization. Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error + // Called to trigger the allocation of CPUs to a container. This must be + // called at some point prior to the AddContainer() call for a container, + // e.g. at pod admission time. + Allocate(pod *v1.Pod, container *v1.Container) error + // AddContainer is called between container create and container start // so that initial CPU affinity settings can be written through to the // container runtime before the first process begins to execute. @@ -206,39 +211,33 @@ func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesRe return nil } -func (m *manager) AddContainer(p *v1.Pod, c *v1.Container, containerID string) error { +func (m *manager) Allocate(p *v1.Pod, c *v1.Container) error { m.Lock() - // Proactively remove CPUs from init containers that have already run. - // They are guaranteed to have run to completion before any other - // container is run. - for _, initContainer := range p.Spec.InitContainers { - if c.Name != initContainer.Name { - err := m.policyRemoveContainerByRef(string(p.UID), initContainer.Name) - if err != nil { - klog.Warningf("[cpumanager] unable to remove init container (pod: %s, container: %s, error: %v)", string(p.UID), initContainer.Name, err) - } - } - } + defer m.Unlock() // Call down into the policy to assign this container CPUs if required. - err := m.policyAddContainer(p, c, containerID) + err := m.policy.Allocate(m.state, p, c) if err != nil { - klog.Errorf("[cpumanager] AddContainer error: %v", err) - m.Unlock() + klog.Errorf("[cpumanager] Allocate error: %v", err) return err } - // Get the CPUs just assigned to the container (or fall back to the default - // CPUSet if none were assigned). + return nil +} + +func (m *manager) AddContainer(p *v1.Pod, c *v1.Container, containerID string) error { + m.Lock() + // Get the CPUs assigned to the container during Allocate() + // (or fall back to the default CPUSet if none were assigned). cpus := m.state.GetCPUSetOrDefault(string(p.UID), c.Name) m.Unlock() if !cpus.IsEmpty() { - err = m.updateContainerCPUSet(containerID, cpus) + err := m.updateContainerCPUSet(containerID, cpus) if err != nil { klog.Errorf("[cpumanager] AddContainer error: error updating CPUSet for container (pod: %s, container: %s, container id: %s, err: %v)", p.Name, c.Name, containerID, err) m.Lock() - err := m.policyRemoveContainerByID(containerID) + err := m.policyRemoveContainerByRef(string(p.UID), c.Name) if err != nil { klog.Errorf("[cpumanager] AddContainer rollback state error: %v", err) } @@ -246,6 +245,7 @@ func (m *manager) AddContainer(p *v1.Pod, c *v1.Container, containerID string) e } return err } + klog.V(5).Infof("[cpumanager] update container resources is skipped due to cpu set is empty") return nil } @@ -263,14 +263,6 @@ func (m *manager) RemoveContainer(containerID string) error { return nil } -func (m *manager) policyAddContainer(p *v1.Pod, c *v1.Container, containerID string) error { - err := m.policy.AddContainer(m.state, p, c) - if err == nil { - m.containerMap.Add(string(p.UID), c.Name, containerID) - } - return err -} - func (m *manager) policyRemoveContainerByID(containerID string) error { podUID, containerName, err := m.containerMap.GetContainerRef(containerID) if err != nil { diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go index 451943e6d4b..e9c7852c602 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go @@ -104,7 +104,7 @@ func (p *mockPolicy) Start(s state.State) error { return p.err } -func (p *mockPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container) error { +func (p *mockPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error { return p.err } @@ -223,18 +223,20 @@ func TestCPUManagerAdd(t *testing.T) { cpuset.NewCPUSet(), topologymanager.NewFakeManager()) testCases := []struct { - description string - updateErr error - policy Policy - expCPUSet cpuset.CPUSet - expErr error + description string + updateErr error + policy Policy + expCPUSet cpuset.CPUSet + expAllocateErr error + expAddContainerErr error }{ { - description: "cpu manager add - no error", - updateErr: nil, - policy: testPolicy, - expCPUSet: cpuset.NewCPUSet(3, 4), - expErr: nil, + description: "cpu manager add - no error", + updateErr: nil, + policy: testPolicy, + expCPUSet: cpuset.NewCPUSet(3, 4), + expAllocateErr: nil, + expAddContainerErr: nil, }, { description: "cpu manager add - policy add container error", @@ -242,15 +244,17 @@ func TestCPUManagerAdd(t *testing.T) { policy: &mockPolicy{ err: fmt.Errorf("fake reg error"), }, - expCPUSet: cpuset.NewCPUSet(1, 2, 3, 4), - expErr: fmt.Errorf("fake reg error"), + expCPUSet: cpuset.NewCPUSet(1, 2, 3, 4), + expAllocateErr: fmt.Errorf("fake reg error"), + expAddContainerErr: nil, }, { - description: "cpu manager add - container update error", - updateErr: fmt.Errorf("fake update error"), - policy: testPolicy, - expCPUSet: cpuset.NewCPUSet(1, 2, 3, 4), - expErr: fmt.Errorf("fake update error"), + description: "cpu manager add - container update error", + updateErr: fmt.Errorf("fake update error"), + policy: testPolicy, + expCPUSet: cpuset.NewCPUSet(1, 2, 3, 4), + expAllocateErr: nil, + expAddContainerErr: fmt.Errorf("fake update error"), }, } @@ -271,10 +275,16 @@ func TestCPUManagerAdd(t *testing.T) { pod := makePod("fakePod", "fakeContainer", "2", "2") container := &pod.Spec.Containers[0] - err := mgr.AddContainer(pod, container, "fakeID") - if !reflect.DeepEqual(err, testCase.expErr) { + err := mgr.Allocate(pod, container) + if !reflect.DeepEqual(err, testCase.expAllocateErr) { + t.Errorf("CPU Manager Allocate() error (%v). expected error: %v but got: %v", + testCase.description, testCase.expAllocateErr, err) + } + + err = mgr.AddContainer(pod, container, "fakeID") + if !reflect.DeepEqual(err, testCase.expAddContainerErr) { t.Errorf("CPU Manager AddContainer() error (%v). expected error: %v but got: %v", - testCase.description, testCase.expErr, err) + testCase.description, testCase.expAddContainerErr, err) } if !testCase.expCPUSet.Equals(mgr.state.GetDefaultCPUSet()) { t.Errorf("CPU Manager AddContainer() error (%v). expected cpuset: %v but got: %v", @@ -494,7 +504,12 @@ func TestCPUManagerAddWithInitContainers(t *testing.T) { testCase.expCSets...) for i := range containers { - err := mgr.AddContainer(testCase.pod, &containers[i], containerIDs[i]) + err := mgr.Allocate(testCase.pod, &containers[i]) + if err != nil { + t.Errorf("StaticPolicy Allocate() error (%v). unexpected error for container id: %v: %v", + testCase.description, containerIDs[i], err) + } + err = mgr.AddContainer(testCase.pod, &containers[i], containerIDs[i]) if err != nil { t.Errorf("StaticPolicy AddContainer() error (%v). unexpected error for container id: %v: %v", testCase.description, containerIDs[i], err) @@ -970,25 +985,28 @@ func TestCPUManagerAddWithResvList(t *testing.T) { cpuset.NewCPUSet(0), topologymanager.NewFakeManager()) testCases := []struct { - description string - updateErr error - policy Policy - expCPUSet cpuset.CPUSet - expErr error + description string + updateErr error + policy Policy + expCPUSet cpuset.CPUSet + expAllocateErr error + expAddContainerErr error }{ { - description: "cpu manager add - no error", - updateErr: nil, - policy: testPolicy, - expCPUSet: cpuset.NewCPUSet(0, 3), - expErr: nil, + description: "cpu manager add - no error", + updateErr: nil, + policy: testPolicy, + expCPUSet: cpuset.NewCPUSet(0, 3), + expAllocateErr: nil, + expAddContainerErr: nil, }, { - description: "cpu manager add - container update error", - updateErr: fmt.Errorf("fake update error"), - policy: testPolicy, - expCPUSet: cpuset.NewCPUSet(0, 1, 2, 3), - expErr: fmt.Errorf("fake update error"), + description: "cpu manager add - container update error", + updateErr: fmt.Errorf("fake update error"), + policy: testPolicy, + expCPUSet: cpuset.NewCPUSet(0, 1, 2, 3), + expAllocateErr: nil, + expAddContainerErr: fmt.Errorf("fake update error"), }, } @@ -1009,10 +1027,16 @@ func TestCPUManagerAddWithResvList(t *testing.T) { pod := makePod("fakePod", "fakeContainer", "2", "2") container := &pod.Spec.Containers[0] - err := mgr.AddContainer(pod, container, "fakeID") - if !reflect.DeepEqual(err, testCase.expErr) { + err := mgr.Allocate(pod, container) + if !reflect.DeepEqual(err, testCase.expAllocateErr) { + t.Errorf("CPU Manager Allocate() error (%v). expected error: %v but got: %v", + testCase.description, testCase.expAllocateErr, err) + } + + err = mgr.AddContainer(pod, container, "fakeID") + if !reflect.DeepEqual(err, testCase.expAddContainerErr) { t.Errorf("CPU Manager AddContainer() error (%v). expected error: %v but got: %v", - testCase.description, testCase.expErr, err) + testCase.description, testCase.expAddContainerErr, err) } if !testCase.expCPUSet.Equals(mgr.state.GetDefaultCPUSet()) { t.Errorf("CPU Manager AddContainer() error (%v). expected cpuset: %v but got: %v", diff --git a/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go b/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go index d958935db62..b5537f787b6 100644 --- a/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go @@ -40,6 +40,11 @@ func (m *fakeManager) Policy() Policy { return NewNonePolicy() } +func (m *fakeManager) Allocate(pod *v1.Pod, container *v1.Container) error { + klog.Infof("[fake cpumanager] Allocate (pod: %s, container: %s", pod.Name, container.Name) + return nil +} + func (m *fakeManager) AddContainer(pod *v1.Pod, container *v1.Container, containerID string) error { klog.Infof("[fake cpumanager] AddContainer (pod: %s, container: %s, container id: %s)", pod.Name, container.Name, containerID) return nil diff --git a/pkg/kubelet/cm/cpumanager/policy.go b/pkg/kubelet/cm/cpumanager/policy.go index 2031612be91..958dac0c2af 100644 --- a/pkg/kubelet/cm/cpumanager/policy.go +++ b/pkg/kubelet/cm/cpumanager/policy.go @@ -26,8 +26,8 @@ import ( type Policy interface { Name() string Start(s state.State) error - // AddContainer call is idempotent - AddContainer(s state.State, pod *v1.Pod, container *v1.Container) error + // Allocate call is idempotent + Allocate(s state.State, pod *v1.Pod, container *v1.Container) error // RemoveContainer call is idempotent RemoveContainer(s state.State, podUID string, containerName string) error // GetTopologyHints implements the topologymanager.HintProvider Interface diff --git a/pkg/kubelet/cm/cpumanager/policy_none.go b/pkg/kubelet/cm/cpumanager/policy_none.go index 77cd367b642..bc71fe8f29d 100644 --- a/pkg/kubelet/cm/cpumanager/policy_none.go +++ b/pkg/kubelet/cm/cpumanager/policy_none.go @@ -44,7 +44,7 @@ func (p *nonePolicy) Start(s state.State) error { return nil } -func (p *nonePolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container) error { +func (p *nonePolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error { return nil } diff --git a/pkg/kubelet/cm/cpumanager/policy_none_test.go b/pkg/kubelet/cm/cpumanager/policy_none_test.go index a28dfa6a0e9..d2dcbecdd2e 100644 --- a/pkg/kubelet/cm/cpumanager/policy_none_test.go +++ b/pkg/kubelet/cm/cpumanager/policy_none_test.go @@ -33,7 +33,7 @@ func TestNonePolicyName(t *testing.T) { } } -func TestNonePolicyAdd(t *testing.T) { +func TestNonePolicyAllocate(t *testing.T) { policy := &nonePolicy{} st := &mockState{ @@ -44,9 +44,9 @@ func TestNonePolicyAdd(t *testing.T) { testPod := makePod("fakePod", "fakeContainer", "1000m", "1000m") container := &testPod.Spec.Containers[0] - err := policy.AddContainer(st, testPod, container) + err := policy.Allocate(st, testPod, container) if err != nil { - t.Errorf("NonePolicy AddContainer() error. expected no error but got: %v", err) + t.Errorf("NonePolicy Allocate() error. expected no error but got: %v", err) } } diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index c088df866eb..da68ed808bd 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -188,9 +188,9 @@ func (p *staticPolicy) assignableCPUs(s state.State) cpuset.CPUSet { return s.GetDefaultCPUSet().Difference(p.reserved) } -func (p *staticPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container) error { +func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error { if numCPUs := p.guaranteedCPUs(pod, container); numCPUs != 0 { - klog.Infof("[cpumanager] static policy: AddContainer (pod: %s, container: %s)", pod.Name, container.Name) + klog.Infof("[cpumanager] static policy: Allocate (pod: %s, container: %s)", pod.Name, container.Name) // container belongs in an exclusively allocated pool if _, ok := s.GetCPUSet(string(pod.UID), container.Name); ok { @@ -209,6 +209,17 @@ func (p *staticPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Co return err } s.SetCPUSet(string(pod.UID), container.Name, cpuset) + + // Check if the container that has just been allocated resources is an init container. + // If so, release its CPUs back into the shared pool so they can be reallocated. + for _, initContainer := range pod.Spec.InitContainers { + if container.Name == initContainer.Name { + if toRelease, ok := s.GetCPUSet(string(pod.UID), container.Name); ok { + // Mutate the shared pool, adding released cpus. + s.SetDefaultCPUSet(s.GetDefaultCPUSet().Union(toRelease)) + } + } + } } // container belongs in the shared pool (nothing to do; use default cpuset) return nil diff --git a/pkg/kubelet/cm/cpumanager/policy_static_test.go b/pkg/kubelet/cm/cpumanager/policy_static_test.go index 9e67692b459..ea2bcf11333 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static_test.go +++ b/pkg/kubelet/cm/cpumanager/policy_static_test.go @@ -444,26 +444,26 @@ func TestStaticPolicyAdd(t *testing.T) { } container := &testCase.pod.Spec.Containers[0] - err := policy.AddContainer(st, testCase.pod, container) + err := policy.Allocate(st, testCase.pod, container) if !reflect.DeepEqual(err, testCase.expErr) { - t.Errorf("StaticPolicy AddContainer() error (%v). expected add error: %v but got: %v", + t.Errorf("StaticPolicy Allocate() error (%v). expected add error: %v but got: %v", testCase.description, testCase.expErr, err) } if testCase.expCPUAlloc { cset, found := st.assignments[string(testCase.pod.UID)][container.Name] if !found { - t.Errorf("StaticPolicy AddContainer() error (%v). expected container %v to be present in assignments %v", + t.Errorf("StaticPolicy Allocate() error (%v). expected container %v to be present in assignments %v", testCase.description, container.Name, st.assignments) } if !reflect.DeepEqual(cset, testCase.expCSet) { - t.Errorf("StaticPolicy AddContainer() error (%v). expected cpuset %v but got %v", + t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %v but got %v", testCase.description, testCase.expCSet, cset) } if !cset.Intersection(st.defaultCPUSet).IsEmpty() { - t.Errorf("StaticPolicy AddContainer() error (%v). expected cpuset %v to be disoint from the shared cpuset %v", + t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %v to be disoint from the shared cpuset %v", testCase.description, cset, st.defaultCPUSet) } } @@ -471,7 +471,7 @@ func TestStaticPolicyAdd(t *testing.T) { if !testCase.expCPUAlloc { _, found := st.assignments[string(testCase.pod.UID)][container.Name] if found { - t.Errorf("StaticPolicy AddContainer() error (%v). Did not expect container %v to be present in assignments %v", + t.Errorf("StaticPolicy Allocate() error (%v). Did not expect container %v to be present in assignments %v", testCase.description, container.Name, st.assignments) } } @@ -786,26 +786,26 @@ func TestStaticPolicyAddWithResvList(t *testing.T) { } container := &testCase.pod.Spec.Containers[0] - err := policy.AddContainer(st, testCase.pod, container) + err := policy.Allocate(st, testCase.pod, container) if !reflect.DeepEqual(err, testCase.expErr) { - t.Errorf("StaticPolicy AddContainer() error (%v). expected add error: %v but got: %v", + t.Errorf("StaticPolicy Allocate() error (%v). expected add error: %v but got: %v", testCase.description, testCase.expErr, err) } if testCase.expCPUAlloc { cset, found := st.assignments[string(testCase.pod.UID)][container.Name] if !found { - t.Errorf("StaticPolicy AddContainer() error (%v). expected container %v to be present in assignments %v", + t.Errorf("StaticPolicy Allocate() error (%v). expected container %v to be present in assignments %v", testCase.description, container.Name, st.assignments) } if !reflect.DeepEqual(cset, testCase.expCSet) { - t.Errorf("StaticPolicy AddContainer() error (%v). expected cpuset %v but got %v", + t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %v but got %v", testCase.description, testCase.expCSet, cset) } if !cset.Intersection(st.defaultCPUSet).IsEmpty() { - t.Errorf("StaticPolicy AddContainer() error (%v). expected cpuset %v to be disoint from the shared cpuset %v", + t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %v to be disoint from the shared cpuset %v", testCase.description, cset, st.defaultCPUSet) } } @@ -813,7 +813,7 @@ func TestStaticPolicyAddWithResvList(t *testing.T) { if !testCase.expCPUAlloc { _, found := st.assignments[string(testCase.pod.UID)][container.Name] if found { - t.Errorf("StaticPolicy AddContainer() error (%v). Did not expect container %v to be present in assignments %v", + t.Errorf("StaticPolicy Allocate() error (%v). Did not expect container %v to be present in assignments %v", testCase.description, container.Name, st.assignments) } } diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index 4b5838ae033..65b91393ae4 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -105,6 +105,10 @@ type ManagerImpl struct { // Store of Topology Affinties that the Device Manager can query. topologyAffinityStore topologymanager.Store + + // devicesToReuse contains devices that can be reused as they have been allocated to + // init containers. + devicesToReuse PodReusableDevices } type endpointInfo struct { @@ -114,6 +118,9 @@ type endpointInfo struct { type sourcesReadyStub struct{} +// PodReusableDevices is a map by pod name of devices to reuse. +type PodReusableDevices map[string]map[string]sets.String + func (s *sourcesReadyStub) AddSource(source string) {} func (s *sourcesReadyStub) AllReady() bool { return true } @@ -147,6 +154,7 @@ func newManagerImpl(socketPath string, numaNodeInfo cputopology.NUMANodeInfo, to podDevices: make(podDevices), numaNodes: numaNodes, topologyAffinityStore: topologyAffinityStore, + devicesToReuse: make(PodReusableDevices), } manager.callback = manager.genericDeviceUpdateCallback @@ -350,32 +358,41 @@ func (m *ManagerImpl) isVersionCompatibleWithPlugin(versions []string) bool { return false } -func (m *ManagerImpl) allocatePodResources(pod *v1.Pod) error { - devicesToReuse := make(map[string]sets.String) - for _, container := range pod.Spec.InitContainers { - if err := m.allocateContainerResources(pod, &container, devicesToReuse); err != nil { - return err - } - m.podDevices.addContainerAllocatedResources(string(pod.UID), container.Name, devicesToReuse) - } - for _, container := range pod.Spec.Containers { - if err := m.allocateContainerResources(pod, &container, devicesToReuse); err != nil { - return err - } - m.podDevices.removeContainerAllocatedResources(string(pod.UID), container.Name, devicesToReuse) - } - return nil -} - // Allocate is the call that you can use to allocate a set of devices // from the registered device plugins. -func (m *ManagerImpl) Allocate(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { - pod := attrs.Pod - err := m.allocatePodResources(pod) - if err != nil { - klog.Errorf("Failed to allocate device plugin resource for pod %s: %v", string(pod.UID), err) +func (m *ManagerImpl) Allocate(pod *v1.Pod, container *v1.Container) error { + if _, ok := m.devicesToReuse[string(pod.UID)]; !ok { + m.devicesToReuse[string(pod.UID)] = make(map[string]sets.String) + } + // If pod entries to m.devicesToReuse other than the current pod exist, delete them. + for podUID := range m.devicesToReuse { + if podUID != string(pod.UID) { + delete(m.devicesToReuse, podUID) + } + } + // Allocate resources for init containers first as we know the caller always loops + // through init containers before looping through app containers. Should the caller + // ever change those semantics, this logic will need to be amended. + for _, initContainer := range pod.Spec.InitContainers { + if container.Name == initContainer.Name { + if err := m.allocateContainerResources(pod, container, m.devicesToReuse[string(pod.UID)]); err != nil { + return err + } + m.podDevices.addContainerAllocatedResources(string(pod.UID), container.Name, m.devicesToReuse[string(pod.UID)]) + return nil + } + } + if err := m.allocateContainerResources(pod, container, m.devicesToReuse[string(pod.UID)]); err != nil { return err } + m.podDevices.removeContainerAllocatedResources(string(pod.UID), container.Name, m.devicesToReuse[string(pod.UID)]) + return nil + +} + +// UpdatePluginResources updates node resources based on devices already allocated to pods. +func (m *ManagerImpl) UpdatePluginResources(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { + pod := attrs.Pod m.mutex.Lock() defer m.mutex.Unlock() @@ -860,8 +877,8 @@ func (m *ManagerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Co } } if needsReAllocate { - klog.V(2).Infof("needs re-allocate device plugin resources for pod %s", podUID) - if err := m.allocatePodResources(pod); err != nil { + klog.V(2).Infof("needs re-allocate device plugin resources for pod %s, container %s", podUID, container.Name) + if err := m.Allocate(pod, container); err != nil { return nil, err } } diff --git a/pkg/kubelet/cm/devicemanager/manager_stub.go b/pkg/kubelet/cm/devicemanager/manager_stub.go index 4301c7dc3b9..ed6fb41e58e 100644 --- a/pkg/kubelet/cm/devicemanager/manager_stub.go +++ b/pkg/kubelet/cm/devicemanager/manager_stub.go @@ -45,7 +45,12 @@ func (h *ManagerStub) Stop() error { } // Allocate simply returns nil. -func (h *ManagerStub) Allocate(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { +func (h *ManagerStub) Allocate(pod *v1.Pod, container *v1.Container) error { + return nil +} + +// UpdatePluginResources simply returns nil. +func (h *ManagerStub) UpdatePluginResources(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { return nil } diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index 5b7e8700594..cefd25937a0 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -30,6 +30,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" @@ -604,6 +605,7 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso allocatedDevices: make(map[string]sets.String), endpoints: make(map[string]endpointInfo), podDevices: make(podDevices), + devicesToReuse: make(PodReusableDevices), topologyAffinityStore: topologymanager.NewFakeManager(), activePods: activePods, sourcesReady: &sourcesReadyStub{}, @@ -648,17 +650,6 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso return testManager, nil } -func getTestNodeInfo(allocatable v1.ResourceList) *schedulernodeinfo.NodeInfo { - cachedNode := &v1.Node{ - Status: v1.NodeStatus{ - Allocatable: allocatable, - }, - } - nodeInfo := &schedulernodeinfo.NodeInfo{} - nodeInfo.SetNode(cachedNode) - return nodeInfo -} - type TestResource struct { resourceName string resourceQuantity resource.Quantity @@ -686,7 +677,6 @@ func TestPodContainerDeviceAllocation(t *testing.T) { tmpDir, err := ioutil.TempDir("", "checkpoint") as.Nil(err) defer os.RemoveAll(tmpDir) - nodeInfo := getTestNodeInfo(v1.ResourceList{}) testManager, err := getTestManager(tmpDir, podsStub.getActivePods, testResources) as.Nil(err) @@ -738,7 +728,7 @@ func TestPodContainerDeviceAllocation(t *testing.T) { pod := testCase.testPod activePods = append(activePods, pod) podsStub.updateActivePods(activePods) - err := testManager.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: pod}) + err := testManager.Allocate(pod, &pod.Spec.Containers[0]) if !reflect.DeepEqual(err, testCase.expErr) { t.Errorf("DevicePluginManager error (%v). expected error: %v but got: %v", testCase.description, testCase.expErr, err) @@ -780,7 +770,6 @@ func TestInitContainerDeviceAllocation(t *testing.T) { podsStub := activePodsStub{ activePods: []*v1.Pod{}, } - nodeInfo := getTestNodeInfo(v1.ResourceList{}) tmpDir, err := ioutil.TempDir("", "checkpoint") as.Nil(err) defer os.RemoveAll(tmpDir) @@ -834,7 +823,12 @@ func TestInitContainerDeviceAllocation(t *testing.T) { }, } podsStub.updateActivePods([]*v1.Pod{podWithPluginResourcesInInitContainers}) - err = testManager.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: podWithPluginResourcesInInitContainers}) + for _, container := range podWithPluginResourcesInInitContainers.Spec.InitContainers { + err = testManager.Allocate(podWithPluginResourcesInInitContainers, &container) + } + for _, container := range podWithPluginResourcesInInitContainers.Spec.Containers { + err = testManager.Allocate(podWithPluginResourcesInInitContainers, &container) + } as.Nil(err) podUID := string(podWithPluginResourcesInInitContainers.UID) initCont1 := podWithPluginResourcesInInitContainers.Spec.InitContainers[0].Name @@ -855,7 +849,10 @@ func TestInitContainerDeviceAllocation(t *testing.T) { as.Equal(0, normalCont1Devices.Intersection(normalCont2Devices).Len()) } -func TestSanitizeNodeAllocatable(t *testing.T) { +func TestUpdatePluginResources(t *testing.T) { + pod := &v1.Pod{} + pod.UID = types.UID("testPod") + resourceName1 := "domain1.com/resource1" devID1 := "dev1" @@ -876,6 +873,8 @@ func TestSanitizeNodeAllocatable(t *testing.T) { podDevices: make(podDevices), checkpointManager: ckm, } + testManager.podDevices[string(pod.UID)] = make(containerDevices) + // require one of resource1 and one of resource2 testManager.allocatedDevices[resourceName1] = sets.NewString() testManager.allocatedDevices[resourceName1].Insert(devID1) @@ -893,7 +892,7 @@ func TestSanitizeNodeAllocatable(t *testing.T) { nodeInfo := &schedulernodeinfo.NodeInfo{} nodeInfo.SetNode(cachedNode) - testManager.sanitizeNodeAllocatable(nodeInfo) + testManager.UpdatePluginResources(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: pod}) allocatableScalarResources := nodeInfo.AllocatableResource().ScalarResources // allocatable in nodeInfo is less than needed, should update @@ -918,7 +917,6 @@ func TestDevicePreStartContainer(t *testing.T) { tmpDir, err := ioutil.TempDir("", "checkpoint") as.Nil(err) defer os.RemoveAll(tmpDir) - nodeInfo := getTestNodeInfo(v1.ResourceList{}) testManager, err := getTestManager(tmpDir, podsStub.getActivePods, []TestResource{res1}) as.Nil(err) @@ -936,7 +934,7 @@ func TestDevicePreStartContainer(t *testing.T) { activePods := []*v1.Pod{} activePods = append(activePods, pod) podsStub.updateActivePods(activePods) - err = testManager.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: pod}) + err = testManager.Allocate(pod, &pod.Spec.Containers[0]) as.Nil(err) runContainerOpts, err := testManager.GetDeviceRunContainerOptions(pod, &pod.Spec.Containers[0]) as.Nil(err) diff --git a/pkg/kubelet/cm/devicemanager/types.go b/pkg/kubelet/cm/devicemanager/types.go index 1b76a89a1df..9fcafe53ec4 100644 --- a/pkg/kubelet/cm/devicemanager/types.go +++ b/pkg/kubelet/cm/devicemanager/types.go @@ -34,15 +34,17 @@ type Manager interface { // Start starts device plugin registration service. Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error - // Allocate configures and assigns devices to pods. The pods are provided - // through the pod admission attributes in the attrs argument. From the - // requested device resources, Allocate will communicate with the owning - // device plugin to allow setup procedures to take place, and for the - // device plugin to provide runtime settings to use the device (environment - // variables, mount points and device files). The node object is provided - // for the device manager to update the node capacity to reflect the - // currently available devices. - Allocate(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error + // Allocate configures and assigns devices to a container in a pod. From + // the requested device resources, Allocate will communicate with the + // owning device plugin to allow setup procedures to take place, and for + // the device plugin to provide runtime settings to use the device + // (environment variables, mount points and device files). + Allocate(pod *v1.Pod, container *v1.Container) error + + // UpdatePluginResources updates node resources based on devices already + // allocated to pods. The node object is provided for the device manager to + // update the node capacity to reflect the currently available devices. + UpdatePluginResources(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error // Stop stops the manager. Stop() error diff --git a/pkg/kubelet/cm/topologymanager/topology_manager.go b/pkg/kubelet/cm/topologymanager/topology_manager.go index dab59aa8aa6..926260bf837 100644 --- a/pkg/kubelet/cm/topologymanager/topology_manager.go +++ b/pkg/kubelet/cm/topologymanager/topology_manager.go @@ -77,6 +77,10 @@ type HintProvider interface { // a consensus "best" hint. The hint providers may subsequently query the // topology manager to influence actual resource assignment. GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]TopologyHint + // Allocate triggers resource allocation to occur on the HintProvider after + // all hints have been gathered and the aggregated Hint is available via a + // call to Store.GetAffinity(). + Allocate(pod *v1.Pod, container *v1.Container) error } //Store interface is to allow Hint Providers to retrieve pod affinity @@ -176,6 +180,16 @@ func (m *manager) accumulateProvidersHints(pod *v1.Pod, container *v1.Container) return providersHints } +func (m *manager) allocateAlignedResources(pod *v1.Pod, container *v1.Container) error { + for _, provider := range m.hintProviders { + err := provider.Allocate(pod, container) + if err != nil { + return err + } + } + return nil +} + // Collect Hints from hint providers and pass to policy to retrieve the best one. func (m *manager) calculateAffinity(pod *v1.Pod, container *v1.Container) (TopologyHint, bool) { providersHints := m.accumulateProvidersHints(pod, container) @@ -216,7 +230,6 @@ func (m *manager) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitR klog.Infof("[topologymanager] Topology Admit Handler") pod := attrs.Pod - hints := make(map[string]TopologyHint) for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) { result, admit := m.calculateAffinity(pod, &container) @@ -227,11 +240,22 @@ func (m *manager) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitR Admit: false, } } - hints[container.Name] = result - } - m.podTopologyHints[string(pod.UID)] = hints - klog.Infof("[topologymanager] Topology Affinity for Pod: %v are %v", pod.UID, m.podTopologyHints[string(pod.UID)]) + klog.Infof("[topologymanager] Topology Affinity for (pod: %v container: %v): %v", pod.UID, container.Name, result) + if m.podTopologyHints[string(pod.UID)] == nil { + m.podTopologyHints[string(pod.UID)] = make(map[string]TopologyHint) + } + m.podTopologyHints[string(pod.UID)][container.Name] = result + + err := m.allocateAlignedResources(pod, &container) + if err != nil { + return lifecycle.PodAdmitResult{ + Message: fmt.Sprintf("Allocate failed due to %v, which is unexpected", err), + Reason: "UnexpectedAdmissionError", + Admit: false, + } + } + } return lifecycle.PodAdmitResult{Admit: true} } diff --git a/pkg/kubelet/cm/topologymanager/topology_manager_test.go b/pkg/kubelet/cm/topologymanager/topology_manager_test.go index 2176306c99e..e08d4c142c1 100644 --- a/pkg/kubelet/cm/topologymanager/topology_manager_test.go +++ b/pkg/kubelet/cm/topologymanager/topology_manager_test.go @@ -75,12 +75,20 @@ func TestNewManager(t *testing.T) { type mockHintProvider struct { th map[string][]TopologyHint + //TODO: Add this field and add some tests to make sure things error out + //appropriately on allocation errors. + //allocateError error } func (m *mockHintProvider) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]TopologyHint { return m.th } +func (m *mockHintProvider) Allocate(pod *v1.Pod, container *v1.Container) error { + //return allocateError + return nil +} + func TestGetAffinity(t *testing.T) { tcases := []struct { name string diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 2830f7c0709..e9e56ce8c87 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -867,9 +867,9 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, } klet.AddPodSyncLoopHandler(activeDeadlineHandler) klet.AddPodSyncHandler(activeDeadlineHandler) - if utilfeature.DefaultFeatureGate.Enabled(features.TopologyManager) { - klet.admitHandlers.AddPodAdmitHandler(klet.containerManager.GetTopologyPodAdmitHandler()) - } + + klet.admitHandlers.AddPodAdmitHandler(klet.containerManager.GetAllocateResourcesPodAdmitHandler()) + criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.GetActivePods, killPodNow(klet.podWorkers, kubeDeps.Recorder), kubeDeps.Recorder) klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources)) // apply functional Option's