diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index 1c95b652918..eeea6a8b7e4 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -300,13 +300,14 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I cm.topologyManager, err = topologymanager.NewManager( machineInfo.Topology, nodeConfig.ExperimentalTopologyManagerPolicy, + nodeConfig.ExperimentalTopologyManagerScope, ) if err != nil { return nil, err } - klog.Infof("[topologymanager] Initializing Topology Manager with %s policy", nodeConfig.ExperimentalTopologyManagerPolicy) + klog.Infof("[topologymanager] Initializing Topology Manager with %s policy and %s-level scope", nodeConfig.ExperimentalTopologyManagerPolicy, nodeConfig.ExperimentalTopologyManagerScope) } else { cm.topologyManager = topologymanager.NewFakeManager() } diff --git a/pkg/kubelet/cm/topologymanager/scope.go b/pkg/kubelet/cm/topologymanager/scope.go new file mode 100644 index 00000000000..d26636298b5 --- /dev/null +++ b/pkg/kubelet/cm/topologymanager/scope.go @@ -0,0 +1,148 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package topologymanager + +import ( + "fmt" + + "sync" + + "k8s.io/api/core/v1" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/kubelet/lifecycle" +) + +const ( + // containerTopologyScope specifies the TopologyManagerScope per container. + containerTopologyScope = "container" + // podTopologyScope specifies the TopologyManagerScope per pod. + podTopologyScope = "pod" +) + +type podTopologyHints map[string]map[string]TopologyHint + +// Scope interface for Topology Manager +type Scope interface { + Name() string + Admit(pod *v1.Pod) lifecycle.PodAdmitResult + // AddHintProvider adds a hint provider to manager to indicate the hint provider + // wants to be consoluted with when making topology hints + AddHintProvider(h HintProvider) + // AddContainer adds pod to Manager for tracking + AddContainer(pod *v1.Pod, containerID string) error + // RemoveContainer removes pod from Manager tracking + RemoveContainer(containerID string) error + // Store is the interface for storing pod topology hints + Store +} + +type scope struct { + mutex sync.Mutex + name string + // Mapping of a Pods mapping of Containers and their TopologyHints + // Indexed by PodUID to ContainerName + podTopologyHints podTopologyHints + // The list of components registered with the Manager + hintProviders []HintProvider + // Topology Manager Policy + policy Policy + // Mapping of PodUID to ContainerID for Adding/Removing Pods from PodTopologyHints mapping + podMap map[string]string +} + +func (s *scope) Name() string { + return s.name +} + +func (s *scope) GetAffinity(podUID string, containerName string) TopologyHint { + return s.podTopologyHints[podUID][containerName] +} + +func (s *scope) AddHintProvider(h HintProvider) { + s.hintProviders = append(s.hintProviders, h) +} + +// It would be better to implement this function in topologymanager instead of scope +// but topologymanager do not track mapping anymore +func (s *scope) AddContainer(pod *v1.Pod, containerID string) error { + s.mutex.Lock() + defer s.mutex.Unlock() + + s.podMap[containerID] = string(pod.UID) + return nil +} + +// It would be better to implement this function in topologymanager instead of scope +// but topologymanager do not track mapping anymore +func (s *scope) RemoveContainer(containerID string) error { + s.mutex.Lock() + defer s.mutex.Unlock() + + klog.Infof("[topologymanager] RemoveContainer - Container ID: %v", containerID) + podUIDString := s.podMap[containerID] + delete(s.podMap, containerID) + if _, exists := s.podTopologyHints[podUIDString]; exists { + delete(s.podTopologyHints[podUIDString], containerID) + if len(s.podTopologyHints[podUIDString]) == 0 { + delete(s.podTopologyHints, podUIDString) + } + } + + return nil +} + +func (s *scope) admitPolicyNone(pod *v1.Pod) lifecycle.PodAdmitResult { + for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) { + err := s.allocateAlignedResources(pod, &container) + if err != nil { + return unexpectedAdmissionError(err) + } + } + return admitPod() +} + +// It would be better to implement this function in topologymanager instead of scope +// but topologymanager do not track providers anymore +func (s *scope) allocateAlignedResources(pod *v1.Pod, container *v1.Container) error { + for _, provider := range s.hintProviders { + err := provider.Allocate(pod, container) + if err != nil { + return err + } + } + return nil +} + +func topologyAffinityError() lifecycle.PodAdmitResult { + return lifecycle.PodAdmitResult{ + Message: "Resources cannot be allocated with Topology locality", + Reason: "TopologyAffinityError", + Admit: false, + } +} + +func unexpectedAdmissionError(err error) lifecycle.PodAdmitResult { + return lifecycle.PodAdmitResult{ + Message: fmt.Sprintf("Allocate failed due to %v, which is unexpected", err), + Reason: "UnexpectedAdmissionError", + Admit: false, + } +} + +func admitPod() lifecycle.PodAdmitResult { + return lifecycle.PodAdmitResult{Admit: true} +} diff --git a/pkg/kubelet/cm/topologymanager/scope_container.go b/pkg/kubelet/cm/topologymanager/scope_container.go new file mode 100644 index 00000000000..4908e5f047e --- /dev/null +++ b/pkg/kubelet/cm/topologymanager/scope_container.go @@ -0,0 +1,90 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package topologymanager + +import ( + "k8s.io/api/core/v1" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/kubelet/lifecycle" + "k8s.io/kubernetes/pkg/kubelet/util/format" +) + +type containerScope struct { + scope +} + +// Ensure containerScope implements Scope interface +var _ Scope = &containerScope{} + +// NewContainerScope returns a container scope. +func NewContainerScope(policy Policy) Scope { + return &containerScope{ + scope{ + name: containerTopologyScope, + podTopologyHints: podTopologyHints{}, + policy: policy, + podMap: make(map[string]string), + }, + } +} + +func (s *containerScope) Admit(pod *v1.Pod) lifecycle.PodAdmitResult { + // Exception - Policy : none + if s.policy.Name() == PolicyNone { + return s.admitPolicyNone(pod) + } + + for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) { + bestHint, admit := s.calculateAffinity(pod, &container) + klog.Infof("[topologymanager] Best TopologyHint for (pod: %v container: %v): %v", format.Pod(pod), container.Name, bestHint) + + if !admit { + return topologyAffinityError() + } + + if (s.podTopologyHints)[string(pod.UID)] == nil { + (s.podTopologyHints)[string(pod.UID)] = make(map[string]TopologyHint) + } + + klog.Infof("[topologymanager] Topology Affinity for (pod: %v container: %v): %v", format.Pod(pod), container.Name, bestHint) + (s.podTopologyHints)[string(pod.UID)][container.Name] = bestHint + err := s.allocateAlignedResources(pod, &container) + if err != nil { + return unexpectedAdmissionError(err) + } + } + return admitPod() +} + +func (s *containerScope) accumulateProvidersHints(pod *v1.Pod, container *v1.Container) []map[string][]TopologyHint { + var providersHints []map[string][]TopologyHint + + for _, provider := range s.hintProviders { + // Get the TopologyHints for a Container from a provider. + hints := provider.GetTopologyHints(pod, container) + providersHints = append(providersHints, hints) + klog.Infof("[topologymanager] TopologyHints for pod '%v', container '%v': %v", format.Pod(pod), container.Name, hints) + } + return providersHints +} + +func (s *containerScope) calculateAffinity(pod *v1.Pod, container *v1.Container) (TopologyHint, bool) { + providersHints := s.accumulateProvidersHints(pod, container) + bestHint, admit := s.policy.Merge(providersHints) + klog.Infof("[topologymanager] ContainerTopologyHint: %v", bestHint) + return bestHint, admit +} diff --git a/pkg/kubelet/cm/topologymanager/scope_pod.go b/pkg/kubelet/cm/topologymanager/scope_pod.go new file mode 100644 index 00000000000..033577d2d64 --- /dev/null +++ b/pkg/kubelet/cm/topologymanager/scope_pod.go @@ -0,0 +1,91 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package topologymanager + +import ( + "k8s.io/api/core/v1" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/kubelet/lifecycle" + "k8s.io/kubernetes/pkg/kubelet/util/format" +) + +type podScope struct { + scope +} + +// Ensure podScope implements Scope interface +var _ Scope = &podScope{} + +// NewPodScope returns a pod scope. +func NewPodScope(policy Policy) Scope { + return &podScope{ + scope{ + name: podTopologyScope, + podTopologyHints: podTopologyHints{}, + policy: policy, + podMap: make(map[string]string), + }, + } +} + +func (s *podScope) Admit(pod *v1.Pod) lifecycle.PodAdmitResult { + // Exception - Policy : none + if s.policy.Name() == PolicyNone { + return s.admitPolicyNone(pod) + } + + bestHint, admit := s.calculateAffinity(pod) + klog.Infof("[topologymanager] Best TopologyHint for (pod: %v): %v", format.Pod(pod), bestHint) + if !admit { + return topologyAffinityError() + } + + for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) { + klog.Infof("[topologymanager] Topology Affinity for (pod: %v container: %v): %v", format.Pod(pod), container.Name, bestHint) + + if (s.podTopologyHints)[string(pod.UID)] == nil { + (s.podTopologyHints)[string(pod.UID)] = make(map[string]TopologyHint) + } + + (s.podTopologyHints)[string(pod.UID)][container.Name] = bestHint + + err := s.allocateAlignedResources(pod, &container) + if err != nil { + return unexpectedAdmissionError(err) + } + } + return admitPod() +} + +func (s *podScope) accumulateProvidersHints(pod *v1.Pod) []map[string][]TopologyHint { + var providersHints []map[string][]TopologyHint + + for _, provider := range s.hintProviders { + // Get the TopologyHints for a Pod from a provider. + hints := provider.GetPodTopologyHints(pod) + providersHints = append(providersHints, hints) + klog.Infof("[topologymanager] TopologyHints for pod '%v': %v", format.Pod(pod), hints) + } + return providersHints +} + +func (s *podScope) calculateAffinity(pod *v1.Pod) (TopologyHint, bool) { + providersHints := s.accumulateProvidersHints(pod) + bestHint, admit := s.policy.Merge(providersHints) + klog.Infof("[topologymanager] PodTopologyHint: %v", bestHint) + return bestHint, admit +} diff --git a/pkg/kubelet/cm/topologymanager/topology_manager.go b/pkg/kubelet/cm/topologymanager/topology_manager.go index 4ee524ebec6..00ebf490bd3 100644 --- a/pkg/kubelet/cm/topologymanager/topology_manager.go +++ b/pkg/kubelet/cm/topologymanager/topology_manager.go @@ -18,10 +18,9 @@ package topologymanager import ( "fmt" - "sync" cadvisorapi "github.com/google/cadvisor/info/v1" - v1 "k8s.io/api/core/v1" + "k8s.io/api/core/v1" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask" "k8s.io/kubernetes/pkg/kubelet/lifecycle" @@ -55,16 +54,8 @@ type Manager interface { } type manager struct { - mutex sync.Mutex - //The list of components registered with the Manager - hintProviders []HintProvider - //Mapping of a Pods mapping of Containers and their TopologyHints - //Indexed by PodUID to ContainerName - podTopologyHints map[string]map[string]TopologyHint - //Mapping of PodUID to ContainerID for Adding/Removing Pods from PodTopologyHints mapping - podMap map[string]string - //Topology Manager Policy - policy Policy + //Topology Manager Scope + scope Scope } // HintProvider is an interface for components that want to collaborate to @@ -124,9 +115,9 @@ func (th *TopologyHint) LessThan(other TopologyHint) bool { var _ Manager = &manager{} -// NewManager creates a new TopologyManager based on provided policy -func NewManager(topology []cadvisorapi.Node, topologyPolicyName string) (Manager, error) { - klog.Infof("[topologymanager] Creating topology manager with %s policy", topologyPolicyName) +// NewManager creates a new TopologyManager based on provided policy and scope +func NewManager(topology []cadvisorapi.Node, topologyPolicyName string, topologyScopeName string) (Manager, error) { + klog.Infof("[topologymanager] Creating topology manager with %s policy per %s scope", topologyPolicyName, topologyScopeName) var numaNodes []int for _, node := range topology { @@ -156,123 +147,45 @@ func NewManager(topology []cadvisorapi.Node, topologyPolicyName string) (Manager return nil, fmt.Errorf("unknown policy: \"%s\"", topologyPolicyName) } - var hp []HintProvider - pth := make(map[string]map[string]TopologyHint) - pm := make(map[string]string) + var scope Scope + switch topologyScopeName { + + case containerTopologyScope: + scope = NewContainerScope(policy) + + case podTopologyScope: + scope = NewPodScope(policy) + + default: + return nil, fmt.Errorf("unknown scope: \"%s\"", topologyScopeName) + } + manager := &manager{ - hintProviders: hp, - podTopologyHints: pth, - podMap: pm, - policy: policy, + scope: scope, } return manager, nil } func (m *manager) GetAffinity(podUID string, containerName string) TopologyHint { - return m.podTopologyHints[podUID][containerName] -} - -func (m *manager) accumulateProvidersHints(pod *v1.Pod, container *v1.Container) (providersHints []map[string][]TopologyHint) { - // Loop through all hint providers and save an accumulated list of the - // hints returned by each hint provider. - for _, provider := range m.hintProviders { - // Get the TopologyHints from a provider. - hints := provider.GetTopologyHints(pod, container) - providersHints = append(providersHints, hints) - klog.Infof("[topologymanager] TopologyHints for pod '%v', container '%v': %v", pod.Name, container.Name, hints) - } - return providersHints -} - -func (m *manager) allocateAlignedResources(pod *v1.Pod, container *v1.Container) error { - for _, provider := range m.hintProviders { - err := provider.Allocate(pod, container) - if err != nil { - return err - } - } - return nil -} - -// Collect Hints from hint providers and pass to policy to retrieve the best one. -func (m *manager) calculateAffinity(pod *v1.Pod, container *v1.Container) (TopologyHint, bool) { - providersHints := m.accumulateProvidersHints(pod, container) - bestHint, admit := m.policy.Merge(providersHints) - klog.Infof("[topologymanager] ContainerTopologyHint: %v", bestHint) - return bestHint, admit + return m.scope.GetAffinity(podUID, containerName) } func (m *manager) AddHintProvider(h HintProvider) { - m.hintProviders = append(m.hintProviders, h) + m.scope.AddHintProvider(h) } func (m *manager) AddContainer(pod *v1.Pod, containerID string) error { - m.mutex.Lock() - defer m.mutex.Unlock() - - m.podMap[containerID] = string(pod.UID) - return nil + return m.scope.AddContainer(pod, containerID) } func (m *manager) RemoveContainer(containerID string) error { - m.mutex.Lock() - defer m.mutex.Unlock() - - klog.Infof("[topologymanager] RemoveContainer - Container ID: %v", containerID) - podUIDString := m.podMap[containerID] - delete(m.podMap, containerID) - if _, exists := m.podTopologyHints[podUIDString]; exists { - delete(m.podTopologyHints[podUIDString], containerID) - if len(m.podTopologyHints[podUIDString]) == 0 { - delete(m.podTopologyHints, podUIDString) - } - } - - return nil + return m.scope.RemoveContainer(containerID) } func (m *manager) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult { klog.Infof("[topologymanager] Topology Admit Handler") pod := attrs.Pod - for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) { - if m.policy.Name() == PolicyNone { - err := m.allocateAlignedResources(pod, &container) - if err != nil { - return lifecycle.PodAdmitResult{ - Message: fmt.Sprintf("Allocate failed due to %v, which is unexpected", err), - Reason: "UnexpectedAdmissionError", - Admit: false, - } - } - continue - } - - result, admit := m.calculateAffinity(pod, &container) - if !admit { - return lifecycle.PodAdmitResult{ - Message: "Resources cannot be allocated with Topology locality", - Reason: "TopologyAffinityError", - Admit: false, - } - } - - klog.Infof("[topologymanager] Topology Affinity for (pod: %v container: %v): %v", pod.UID, container.Name, result) - if m.podTopologyHints[string(pod.UID)] == nil { - m.podTopologyHints[string(pod.UID)] = make(map[string]TopologyHint) - } - m.podTopologyHints[string(pod.UID)][container.Name] = result - - err := m.allocateAlignedResources(pod, &container) - if err != nil { - return lifecycle.PodAdmitResult{ - Message: fmt.Sprintf("Allocate failed due to %v, which is unexpected", err), - Reason: "UnexpectedAdmissionError", - Admit: false, - } - } - } - - return lifecycle.PodAdmitResult{Admit: true} + return m.scope.Admit(pod) }