chore(kubelet): migrate topologymanager to contextual logging

This commit is contained in:
phuhung273
2025-10-14 18:30:52 +07:00
parent 9b9cd768a0
commit 7ed61cfde1
23 changed files with 161 additions and 71 deletions

View File

@@ -239,6 +239,7 @@ linters:
contextual k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/.*
contextual k8s.io/kubernetes/pkg/kubelet/cm/dra/.*
contextual k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/.*
contextual k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/.*
contextual k8s.io/kubernetes/pkg/kubelet/lifecycle/.*
contextual k8s.io/kubernetes/pkg/kubelet/pleg/.*
contextual k8s.io/kubernetes/pkg/kubelet/clustertrustbundle/.*

View File

@@ -253,6 +253,7 @@ linters:
contextual k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/.*
contextual k8s.io/kubernetes/pkg/kubelet/cm/dra/.*
contextual k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/.*
contextual k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/.*
contextual k8s.io/kubernetes/pkg/kubelet/lifecycle/.*
contextual k8s.io/kubernetes/pkg/kubelet/pleg/.*
contextual k8s.io/kubernetes/pkg/kubelet/clustertrustbundle/.*

View File

@@ -66,6 +66,7 @@ contextual k8s.io/kubernetes/pkg/kubelet/certificate/.*
contextual k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/.*
contextual k8s.io/kubernetes/pkg/kubelet/cm/dra/.*
contextual k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/.*
contextual k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/.*
contextual k8s.io/kubernetes/pkg/kubelet/lifecycle/.*
contextual k8s.io/kubernetes/pkg/kubelet/pleg/.*
contextual k8s.io/kubernetes/pkg/kubelet/clustertrustbundle/.*

View File

@@ -206,6 +206,10 @@ func validateSystemRequirements(mountUtil mount.Interface) (features, error) {
// Takes the absolute name of the specified containers.
// Empty container name disables use of the specified container.
func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, recorder record.EventRecorder, kubeClient clientset.Interface) (ContainerManager, error) {
// Use klog.TODO() because we currently do not have a proper logger to pass in.
// Replace this with an appropriate logger when refactoring this function to accept a logger parameter.
logger := klog.TODO()
subsystems, err := GetCgroupSubsystems()
if err != nil {
return nil, fmt.Errorf("failed to get mounted cgroup subsystems: %v", err)
@@ -308,7 +312,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
if err != nil {
return nil, err
}
cm.topologyManager.AddHintProvider(cm.deviceManager)
cm.topologyManager.AddHintProvider(logger, cm.deviceManager)
// Initialize DRA manager
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) {
@@ -336,7 +340,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
klog.ErrorS(err, "Failed to initialize cpu manager")
return nil, err
}
cm.topologyManager.AddHintProvider(cm.cpuManager)
cm.topologyManager.AddHintProvider(logger, cm.cpuManager)
cm.memoryManager, err = memorymanager.NewManager(
context.TODO(),
@@ -351,7 +355,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
klog.ErrorS(err, "Failed to initialize memory manager")
return nil, err
}
cm.topologyManager.AddHintProvider(cm.memoryManager)
cm.topologyManager.AddHintProvider(logger, cm.memoryManager)
// Create a single channel for all resource updates. This channel is consumed
// by the Kubelet's main sync loop.

View File

@@ -135,6 +135,10 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
cadvisorInterface: cadvisorInterface,
}
// Use klog.TODO() because we currently do not have a proper logger to pass in.
// Replace this with an appropriate logger when refactoring this function to accept a logger parameter.
logger := klog.TODO()
cm.topologyManager = topologymanager.NewFakeManager()
cm.cpuManager = cpumanager.NewFakeManager()
cm.memoryManager = memorymanager.NewFakeManager(context.TODO())
@@ -165,7 +169,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
klog.ErrorS(err, "Failed to initialize cpu manager")
return nil, err
}
cm.topologyManager.AddHintProvider(cm.cpuManager)
cm.topologyManager.AddHintProvider(logger, cm.cpuManager)
klog.InfoS("Creating memory manager")
cm.memoryManager, err = memorymanager.NewManager(
@@ -181,7 +185,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
klog.ErrorS(err, "Failed to initialize memory manager")
return nil, err
}
cm.topologyManager.AddHintProvider(cm.memoryManager)
cm.topologyManager.AddHintProvider(logger, cm.memoryManager)
}
klog.InfoS("Creating device plugin manager")
@@ -189,7 +193,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
if err != nil {
return nil, err
}
cm.topologyManager.AddHintProvider(cm.deviceManager)
cm.topologyManager.AddHintProvider(logger, cm.deviceManager)
return cm, nil
}

View File

@@ -17,6 +17,8 @@ limitations under the License.
package topologymanager
import (
"context"
"k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/cm/admission"
@@ -30,13 +32,17 @@ type fakeManager struct {
// NewFakeManager returns an instance of FakeManager
func NewFakeManager() Manager {
klog.InfoS("NewFakeManager")
// Use klog.TODO() because changing NewManager requires changes in too many other components
logger := klog.TODO()
logger.Info("NewFakeManager")
return &fakeManager{}
}
// NewFakeManagerWithHint returns an instance of fake topology manager with specified topology hints
func NewFakeManagerWithHint(hint *TopologyHint) Manager {
klog.InfoS("NewFakeManagerWithHint")
// Use klog.TODO() because changing NewManager requires changes in too many other components
logger := klog.TODO()
logger.Info("NewFakeManagerWithHint")
return &fakeManager{
hint: hint,
policy: NewNonePolicy(),
@@ -45,14 +51,20 @@ func NewFakeManagerWithHint(hint *TopologyHint) Manager {
// NewFakeManagerWithPolicy returns an instance of fake topology manager with specified policy
func NewFakeManagerWithPolicy(policy Policy) Manager {
klog.InfoS("NewFakeManagerWithPolicy", "policy", policy.Name())
// Use klog.TODO() because changing NewManager requires changes in too many other components
logger := klog.TODO()
logger.Info("NewFakeManagerWithPolicy", "policy", policy.Name())
return &fakeManager{
policy: policy,
}
}
func (m *fakeManager) GetAffinity(podUID string, containerName string) TopologyHint {
klog.InfoS("GetAffinity", "podUID", podUID, "containerName", containerName)
// Use context.TODO() because we currently do not have a proper context to pass in.
// Replace this with an appropriate context when refactoring this function to accept a context parameter.
ctx := context.TODO()
logger := klog.FromContext(ctx)
logger.Info("GetAffinity", "podUID", podUID, "containerName", containerName)
if m.hint == nil {
return TopologyHint{}
}
@@ -64,20 +76,32 @@ func (m *fakeManager) GetPolicy() Policy {
return m.policy
}
func (m *fakeManager) AddHintProvider(h HintProvider) {
klog.InfoS("AddHintProvider", "hintProvider", h)
func (m *fakeManager) AddHintProvider(logger klog.Logger, h HintProvider) {
logger.Info("AddHintProvider", "hintProvider", h)
}
func (m *fakeManager) AddContainer(pod *v1.Pod, container *v1.Container, containerID string) {
klog.InfoS("AddContainer", "pod", klog.KObj(pod), "containerName", container.Name, "containerID", containerID)
// Use context.TODO() because we currently do not have a proper context to pass in.
// Replace this with an appropriate context when refactoring this function to accept a context parameter.
ctx := context.TODO()
logger := klog.FromContext(ctx)
logger.Info("AddContainer", "pod", klog.KObj(pod), "containerName", container.Name, "containerID", containerID)
}
func (m *fakeManager) RemoveContainer(containerID string) error {
klog.InfoS("RemoveContainer", "containerID", containerID)
// Use context.TODO() because we currently do not have a proper context to pass in.
// Replace this with an appropriate context when refactoring this function to accept a context parameter.
ctx := context.TODO()
logger := klog.FromContext(ctx)
logger.Info("RemoveContainer", "containerID", containerID)
return nil
}
func (m *fakeManager) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
klog.InfoS("Topology Admit Handler")
// TODO: create context here as changing interface https://github.com/kubernetes/kubernetes/blob/09aaf7226056a7964adcb176d789de5507313d00/pkg/kubelet/lifecycle/interfaces.go#L43
// requires changes in too many other components
ctx := context.TODO()
logger := klog.FromContext(ctx)
logger.Info("Topology Admit Handler")
return admission.GetPodAdmitResult(nil)
}

View File

@@ -27,7 +27,7 @@ type Policy interface {
Name() string
// Returns a merged TopologyHint based on input from hint providers
// and a Pod Admit Handler Response based on hints and policy type
Merge(providersHints []map[string][]TopologyHint) (TopologyHint, bool)
Merge(logger klog.Logger, providersHints []map[string][]TopologyHint) (TopologyHint, bool)
}
// IsAlignmentGuaranteed return true if the given policy guarantees that either
@@ -68,7 +68,7 @@ func mergePermutation(defaultAffinity bitmask.BitMask, permutation []TopologyHin
return TopologyHint{mergedAffinity, preferred}
}
func filterProvidersHints(providersHints []map[string][]TopologyHint) [][]TopologyHint {
func filterProvidersHints(logger klog.Logger, providersHints []map[string][]TopologyHint) [][]TopologyHint {
// Loop through all hint providers and save an accumulated list of the
// hints returned by each hint provider. If no hints are provided, assume
// that provider has no preference for topology-aware allocation.
@@ -76,7 +76,7 @@ func filterProvidersHints(providersHints []map[string][]TopologyHint) [][]Topolo
for _, hints := range providersHints {
// If hints is nil, insert a single, preferred any-numa hint into allProviderHints.
if len(hints) == 0 {
klog.InfoS("Hint Provider has no preference for NUMA affinity with any resource")
logger.Info("Hint Provider has no preference for NUMA affinity with any resource")
allProviderHints = append(allProviderHints, []TopologyHint{{nil, true}})
continue
}
@@ -84,13 +84,13 @@ func filterProvidersHints(providersHints []map[string][]TopologyHint) [][]Topolo
// Otherwise, accumulate the hints for each resource type into allProviderHints.
for resource := range hints {
if hints[resource] == nil {
klog.InfoS("Hint Provider has no preference for NUMA affinity with resource", "resource", resource)
logger.Info("Hint Provider has no preference for NUMA affinity with resource", "resource", resource)
allProviderHints = append(allProviderHints, []TopologyHint{{nil, true}})
continue
}
if len(hints[resource]) == 0 {
klog.InfoS("Hint Provider has no possible NUMA affinities for resource", "resource", resource)
logger.Info("Hint Provider has no possible NUMA affinities for resource", "resource", resource)
allProviderHints = append(allProviderHints, []TopologyHint{{nil, false}})
continue
}

View File

@@ -16,6 +16,8 @@ limitations under the License.
package topologymanager
import "k8s.io/klog/v2"
type bestEffortPolicy struct {
// numaInfo represents list of NUMA Nodes available on the underlying machine and distances between them
numaInfo *NUMAInfo
@@ -40,8 +42,8 @@ func (p *bestEffortPolicy) canAdmitPodResult(hint *TopologyHint) bool {
return true
}
func (p *bestEffortPolicy) Merge(providersHints []map[string][]TopologyHint) (TopologyHint, bool) {
filteredHints := filterProvidersHints(providersHints)
func (p *bestEffortPolicy) Merge(logger klog.Logger, providersHints []map[string][]TopologyHint) (TopologyHint, bool) {
filteredHints := filterProvidersHints(logger, providersHints)
merger := NewHintMerger(p.numaInfo, filteredHints, p.Name(), p.opts)
bestHint := merger.Merge()
admit := p.canAdmitPodResult(&bestHint)

View File

@@ -16,6 +16,8 @@ limitations under the License.
package topologymanager
import "k8s.io/klog/v2"
type nonePolicy struct{}
var _ Policy = &nonePolicy{}
@@ -36,6 +38,6 @@ func (p *nonePolicy) canAdmitPodResult(hint *TopologyHint) bool {
return true
}
func (p *nonePolicy) Merge(providersHints []map[string][]TopologyHint) (TopologyHint, bool) {
func (p *nonePolicy) Merge(logger klog.Logger, providersHints []map[string][]TopologyHint) (TopologyHint, bool) {
return TopologyHint{}, p.canAdmitPodResult(nil)
}

View File

@@ -18,6 +18,8 @@ package topologymanager
import (
"testing"
"k8s.io/kubernetes/test/utils/ktesting"
)
func TestPolicyNoneName(t *testing.T) {
@@ -101,9 +103,11 @@ func TestPolicyNoneMerge(t *testing.T) {
},
}
logger, _ := ktesting.NewTestContext(t)
for _, tc := range tcases {
policy := NewNonePolicy()
result, admit := policy.Merge(tc.providersHints)
result, admit := policy.Merge(logger, tc.providersHints)
if !result.IsEqual(tc.expectedHint) || admit != tc.expectedAdmit {
t.Errorf("Test Case: %s: Expected merge hint to be %v, got %v", tc.name, tc.expectedHint, result)
}

View File

@@ -62,7 +62,7 @@ type PolicyOptions struct {
MaxAllowableNUMANodes int
}
func NewPolicyOptions(policyOptions map[string]string) (PolicyOptions, error) {
func NewPolicyOptions(logger klog.Logger, policyOptions map[string]string) (PolicyOptions, error) {
opts := PolicyOptions{
// Set MaxAllowableNUMANodes to the default. This will be overwritten
// if the user has specified a policy option for MaxAllowableNUMANodes.
@@ -92,7 +92,7 @@ func NewPolicyOptions(policyOptions map[string]string) (PolicyOptions, error) {
}
if optValue > defaultMaxAllowableNUMANodes {
klog.InfoS("WARNING: the value of max-allowable-numa-nodes is more than the default recommended value", "max-allowable-numa-nodes", optValue, "defaultMaxAllowableNUMANodes", defaultMaxAllowableNUMANodes)
logger.Info("WARNING: the value of max-allowable-numa-nodes is more than the default recommended value", "max-allowable-numa-nodes", optValue, "defaultMaxAllowableNUMANodes", defaultMaxAllowableNUMANodes)
}
opts.MaxAllowableNUMANodes = optValue
default:

View File

@@ -25,6 +25,7 @@ import (
"k8s.io/component-base/featuregate"
featuregatetesting "k8s.io/component-base/featuregate/testing"
pkgfeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/test/utils/ktesting"
)
var fancyBetaOption = "fancy-new-option"
@@ -143,12 +144,14 @@ func TestNewTopologyManagerOptions(t *testing.T) {
betaOptions.Insert(fancyBetaOption)
alphaOptions.Insert(fancyAlphaOption)
logger, _ := ktesting.NewTestContext(t)
for _, tcase := range testCases {
t.Run(tcase.description, func(t *testing.T) {
if tcase.featureGate != "" {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, tcase.featureGate, tcase.featureGateEnable)
}
opts, err := NewPolicyOptions(tcase.policyOptions)
opts, err := NewPolicyOptions(logger, tcase.policyOptions)
if tcase.expectedErr != nil {
if !strings.Contains(err.Error(), tcase.expectedErr.Error()) {
t.Errorf("Unexpected error message. Have: %s, wants %s", err.Error(), tcase.expectedErr.Error())

View File

@@ -16,6 +16,8 @@ limitations under the License.
package topologymanager
import "k8s.io/klog/v2"
type restrictedPolicy struct {
bestEffortPolicy
}
@@ -38,8 +40,8 @@ func (p *restrictedPolicy) canAdmitPodResult(hint *TopologyHint) bool {
return hint.Preferred
}
func (p *restrictedPolicy) Merge(providersHints []map[string][]TopologyHint) (TopologyHint, bool) {
filteredHints := filterProvidersHints(providersHints)
func (p *restrictedPolicy) Merge(logger klog.Logger, providersHints []map[string][]TopologyHint) (TopologyHint, bool) {
filteredHints := filterProvidersHints(logger, providersHints)
merger := NewHintMerger(p.numaInfo, filteredHints, p.Name(), p.opts)
bestHint := merger.Merge()
admit := p.canAdmitPodResult(&bestHint)

View File

@@ -16,6 +16,8 @@ limitations under the License.
package topologymanager
import "k8s.io/klog/v2"
type singleNumaNodePolicy struct {
// numaInfo represents list of NUMA Nodes available on the underlying machine and distances between them
numaInfo *NUMAInfo
@@ -58,8 +60,8 @@ func filterSingleNumaHints(allResourcesHints [][]TopologyHint) [][]TopologyHint
return filteredResourcesHints
}
func (p *singleNumaNodePolicy) Merge(providersHints []map[string][]TopologyHint) (TopologyHint, bool) {
filteredHints := filterProvidersHints(providersHints)
func (p *singleNumaNodePolicy) Merge(logger klog.Logger, providersHints []map[string][]TopologyHint) (TopologyHint, bool) {
filteredHints := filterProvidersHints(logger, providersHints)
// Filter to only include don't cares and hints with a single NUMA node.
singleNumaHints := filterSingleNumaHints(filteredHints)

View File

@@ -22,6 +22,7 @@ import (
"k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
"k8s.io/kubernetes/test/utils/ktesting"
)
type policyMergeTestCase struct {
@@ -1273,6 +1274,8 @@ func (p *singleNumaNodePolicy) mergeTestCases(numaNodes []int) []policyMergeTest
}
func testPolicyMerge(policy Policy, tcases []policyMergeTestCase, t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
for _, tc := range tcases {
var providersHints []map[string][]TopologyHint
for _, provider := range tc.hp {
@@ -1280,7 +1283,7 @@ func testPolicyMerge(policy Policy, tcases []policyMergeTestCase, t *testing.T)
providersHints = append(providersHints, hints)
}
actual, _ := policy.Merge(providersHints)
actual, _ := policy.Merge(logger, providersHints)
if !reflect.DeepEqual(actual, tc.expected) {
t.Errorf("%v: Expected Topology Hint to be %v, got %v:", tc.name, tc.expected, actual)
}

View File

@@ -17,6 +17,7 @@ limitations under the License.
package topologymanager
import (
"context"
"sync"
"k8s.io/api/core/v1"
@@ -41,7 +42,7 @@ type podTopologyHints map[string]map[string]TopologyHint
type Scope interface {
Name() string
GetPolicy() Policy
Admit(pod *v1.Pod) lifecycle.PodAdmitResult
Admit(ctx context.Context, pod *v1.Pod) lifecycle.PodAdmitResult
// AddHintProvider adds a hint provider to manager to indicate the hint provider
// wants to be consoluted with when making topology hints
AddHintProvider(h HintProvider)
@@ -111,10 +112,14 @@ func (s *scope) AddContainer(pod *v1.Pod, container *v1.Container, containerID s
// It would be better to implement this function in topologymanager instead of scope
// but topologymanager do not track mapping anymore
func (s *scope) RemoveContainer(containerID string) error {
// Use context.TODO() because we currently do not have a proper context to pass in.
// Replace this with an appropriate context when refactoring this function to accept a context parameter.
ctx := context.TODO()
logger := klog.FromContext(ctx)
s.mutex.Lock()
defer s.mutex.Unlock()
klog.InfoS("RemoveContainer", "containerID", containerID)
logger.Info("RemoveContainer", "containerID", containerID)
// Get the podUID and containerName associated with the containerID to be removed and remove it
podUIDString, containerName, err := s.podMap.GetContainerRef(containerID)
if err != nil {

View File

@@ -17,6 +17,8 @@ limitations under the License.
package topologymanager
import (
"context"
"k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/cm/admission"
@@ -44,10 +46,12 @@ func NewContainerScope(policy Policy) Scope {
}
}
func (s *containerScope) Admit(pod *v1.Pod) lifecycle.PodAdmitResult {
func (s *containerScope) Admit(ctx context.Context, pod *v1.Pod) lifecycle.PodAdmitResult {
logger := klog.FromContext(ctx)
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
bestHint, admit := s.calculateAffinity(pod, &container)
klog.InfoS("Best TopologyHint", "bestHint", bestHint, "pod", klog.KObj(pod), "containerName", container.Name)
bestHint, admit := s.calculateAffinity(logger, pod, &container)
logger.Info("Best TopologyHint", "bestHint", bestHint, "pod", klog.KObj(pod), "containerName", container.Name)
if !admit {
if IsAlignmentGuaranteed(s.policy) {
@@ -56,7 +60,7 @@ func (s *containerScope) Admit(pod *v1.Pod) lifecycle.PodAdmitResult {
metrics.TopologyManagerAdmissionErrorsTotal.Inc()
return admission.GetPodAdmitResult(&TopologyAffinityError{})
}
klog.InfoS("Topology Affinity", "bestHint", bestHint, "pod", klog.KObj(pod), "containerName", container.Name)
logger.Info("Topology Affinity", "bestHint", bestHint, "pod", klog.KObj(pod), "containerName", container.Name)
s.setTopologyHints(string(pod.UID), container.Name, bestHint)
err := s.allocateAlignedResources(pod, &container)
@@ -66,28 +70,28 @@ func (s *containerScope) Admit(pod *v1.Pod) lifecycle.PodAdmitResult {
}
if IsAlignmentGuaranteed(s.policy) {
klog.V(4).InfoS("Resource alignment at container scope guaranteed", "pod", klog.KObj(pod))
logger.V(4).Info("Resource alignment at container scope guaranteed", "pod", klog.KObj(pod))
metrics.ContainerAlignedComputeResources.WithLabelValues(metrics.AlignScopeContainer, metrics.AlignedNUMANode).Inc()
}
}
return admission.GetPodAdmitResult(nil)
}
func (s *containerScope) accumulateProvidersHints(pod *v1.Pod, container *v1.Container) []map[string][]TopologyHint {
func (s *containerScope) accumulateProvidersHints(logger klog.Logger, pod *v1.Pod, container *v1.Container) []map[string][]TopologyHint {
var providersHints []map[string][]TopologyHint
for _, provider := range s.hintProviders {
// Get the TopologyHints for a Container from a provider.
hints := provider.GetTopologyHints(pod, container)
providersHints = append(providersHints, hints)
klog.InfoS("TopologyHints", "hints", hints, "pod", klog.KObj(pod), "containerName", container.Name)
logger.Info("TopologyHints", "hints", hints, "pod", klog.KObj(pod), "containerName", container.Name)
}
return providersHints
}
func (s *containerScope) calculateAffinity(pod *v1.Pod, container *v1.Container) (TopologyHint, bool) {
providersHints := s.accumulateProvidersHints(pod, container)
bestHint, admit := s.policy.Merge(providersHints)
klog.InfoS("ContainerTopologyHint", "bestHint", bestHint, "pod", klog.KObj(pod), "containerName", container.Name)
func (s *containerScope) calculateAffinity(logger klog.Logger, pod *v1.Pod, container *v1.Container) (TopologyHint, bool) {
providersHints := s.accumulateProvidersHints(logger, pod, container)
bestHint, admit := s.policy.Merge(logger, providersHints)
logger.Info("ContainerTopologyHint", "bestHint", bestHint, "pod", klog.KObj(pod), "containerName", container.Name)
return bestHint, admit
}

View File

@@ -21,6 +21,7 @@ import (
"testing"
v1 "k8s.io/api/core/v1"
"k8s.io/kubernetes/test/utils/ktesting"
)
func TestContainerCalculateAffinity(t *testing.T) {
@@ -121,6 +122,8 @@ func TestContainerCalculateAffinity(t *testing.T) {
},
}
logger, _ := ktesting.NewTestContext(t)
for _, tc := range tcases {
ctnScope := &containerScope{
scope{
@@ -130,7 +133,7 @@ func TestContainerCalculateAffinity(t *testing.T) {
},
}
ctnScope.calculateAffinity(&v1.Pod{}, &v1.Container{})
ctnScope.calculateAffinity(logger, &v1.Pod{}, &v1.Container{})
actual := ctnScope.policy.(*mockPolicy).ph
if !reflect.DeepEqual(tc.expected, actual) {
t.Errorf("Test Case: %s", tc.name)
@@ -254,13 +257,15 @@ func TestContainerAccumulateProvidersHints(t *testing.T) {
},
}
logger, _ := ktesting.NewTestContext(t)
for _, tc := range tcases {
ctnScope := containerScope{
scope{
hintProviders: tc.hp,
},
}
actual := ctnScope.accumulateProvidersHints(&v1.Pod{}, &v1.Container{})
actual := ctnScope.accumulateProvidersHints(logger, &v1.Pod{}, &v1.Container{})
if !reflect.DeepEqual(actual, tc.expected) {
t.Errorf("Test Case %s: Expected NUMANodeAffinity in result to be %v, got %v", tc.name, tc.expected, actual)
}

View File

@@ -17,6 +17,8 @@ limitations under the License.
package topologymanager
import (
"context"
"k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
@@ -41,6 +43,6 @@ func NewNoneScope() Scope {
}
}
func (s *noneScope) Admit(pod *v1.Pod) lifecycle.PodAdmitResult {
func (s *noneScope) Admit(ctx context.Context, pod *v1.Pod) lifecycle.PodAdmitResult {
return s.admitPolicyNone(pod)
}

View File

@@ -17,6 +17,8 @@ limitations under the License.
package topologymanager
import (
"context"
"k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/cm/admission"
@@ -44,9 +46,11 @@ func NewPodScope(policy Policy) Scope {
}
}
func (s *podScope) Admit(pod *v1.Pod) lifecycle.PodAdmitResult {
bestHint, admit := s.calculateAffinity(pod)
klog.InfoS("Best TopologyHint", "bestHint", bestHint, "pod", klog.KObj(pod))
func (s *podScope) Admit(ctx context.Context, pod *v1.Pod) lifecycle.PodAdmitResult {
logger := klog.FromContext(ctx)
bestHint, admit := s.calculateAffinity(logger, pod)
logger.Info("Best TopologyHint", "bestHint", bestHint, "pod", klog.KObj(pod))
if !admit {
if IsAlignmentGuaranteed(s.policy) {
// increment only if we know we allocate aligned resources.
@@ -57,7 +61,7 @@ func (s *podScope) Admit(pod *v1.Pod) lifecycle.PodAdmitResult {
}
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
klog.InfoS("Topology Affinity", "bestHint", bestHint, "pod", klog.KObj(pod), "containerName", container.Name)
logger.Info("Topology Affinity", "bestHint", bestHint, "pod", klog.KObj(pod), "containerName", container.Name)
s.setTopologyHints(string(pod.UID), container.Name, bestHint)
err := s.allocateAlignedResources(pod, &container)
@@ -68,27 +72,27 @@ func (s *podScope) Admit(pod *v1.Pod) lifecycle.PodAdmitResult {
}
if IsAlignmentGuaranteed(s.policy) {
// increment only if we know we allocate aligned resources.
klog.V(4).InfoS("Resource alignment at pod scope guaranteed", "pod", klog.KObj(pod))
logger.V(4).Info("Resource alignment at pod scope guaranteed", "pod", klog.KObj(pod))
metrics.ContainerAlignedComputeResources.WithLabelValues(metrics.AlignScopePod, metrics.AlignedNUMANode).Inc()
}
return admission.GetPodAdmitResult(nil)
}
func (s *podScope) accumulateProvidersHints(pod *v1.Pod) []map[string][]TopologyHint {
func (s *podScope) accumulateProvidersHints(logger klog.Logger, pod *v1.Pod) []map[string][]TopologyHint {
var providersHints []map[string][]TopologyHint
for _, provider := range s.hintProviders {
// Get the TopologyHints for a Pod from a provider.
hints := provider.GetPodTopologyHints(pod)
providersHints = append(providersHints, hints)
klog.InfoS("TopologyHints", "hints", hints, "pod", klog.KObj(pod))
logger.Info("TopologyHints", "hints", hints, "pod", klog.KObj(pod))
}
return providersHints
}
func (s *podScope) calculateAffinity(pod *v1.Pod) (TopologyHint, bool) {
providersHints := s.accumulateProvidersHints(pod)
bestHint, admit := s.policy.Merge(providersHints)
klog.InfoS("PodTopologyHint", "bestHint", bestHint, "pod", klog.KObj(pod))
func (s *podScope) calculateAffinity(logger klog.Logger, pod *v1.Pod) (TopologyHint, bool) {
providersHints := s.accumulateProvidersHints(logger, pod)
bestHint, admit := s.policy.Merge(logger, providersHints)
logger.Info("PodTopologyHint", "bestHint", bestHint, "pod", klog.KObj(pod))
return bestHint, admit
}

View File

@@ -21,6 +21,7 @@ import (
"testing"
v1 "k8s.io/api/core/v1"
"k8s.io/kubernetes/test/utils/ktesting"
)
func TestPodCalculateAffinity(t *testing.T) {
@@ -121,6 +122,8 @@ func TestPodCalculateAffinity(t *testing.T) {
},
}
logger, _ := ktesting.NewTestContext(t)
for _, tc := range tcases {
podScope := &podScope{
scope{
@@ -130,7 +133,7 @@ func TestPodCalculateAffinity(t *testing.T) {
},
}
podScope.calculateAffinity(&v1.Pod{})
podScope.calculateAffinity(logger, &v1.Pod{})
actual := podScope.policy.(*mockPolicy).ph
if !reflect.DeepEqual(tc.expected, actual) {
t.Errorf("Test Case: %s", tc.name)
@@ -254,13 +257,15 @@ func TestPodAccumulateProvidersHints(t *testing.T) {
},
}
logger, _ := ktesting.NewTestContext(t)
for _, tc := range tcases {
pScope := podScope{
scope{
hintProviders: tc.hp,
},
}
actual := pScope.accumulateProvidersHints(&v1.Pod{})
actual := pScope.accumulateProvidersHints(logger, &v1.Pod{})
if !reflect.DeepEqual(actual, tc.expected) {
t.Errorf("Test Case %s: Expected NUMANodeAffinity in result to be %v, got %v", tc.name, tc.expected, actual)
}

View File

@@ -17,6 +17,7 @@ limitations under the License.
package topologymanager
import (
"context"
"fmt"
"time"
@@ -59,7 +60,7 @@ type Manager interface {
lifecycle.PodAdmitHandler
// AddHintProvider adds a hint provider to manager to indicate the hint provider
// wants to be consulted with when making topology hints
AddHintProvider(HintProvider)
AddHintProvider(logger klog.Logger, h HintProvider)
// AddContainer adds pod to Manager for tracking
AddContainer(pod *v1.Pod, container *v1.Container, containerID string)
// RemoveContainer removes pod from Manager tracking
@@ -133,18 +134,22 @@ var _ Manager = &manager{}
// NewManager creates a new TopologyManager based on provided policy and scope
func NewManager(topology []cadvisorapi.Node, topologyPolicyName string, topologyScopeName string, topologyPolicyOptions map[string]string) (Manager, error) {
// Use klog.TODO() because we currently do not have a proper logger to pass in.
// Replace this with an appropriate logger when refactoring this function to accept a logger parameter.
logger := klog.TODO()
// When policy is none, the scope is not relevant, so we can short circuit here.
if topologyPolicyName == PolicyNone {
klog.InfoS("Creating topology manager with none policy")
logger.Info("Creating topology manager with none policy")
return &manager{scope: NewNoneScope()}, nil
}
opts, err := NewPolicyOptions(topologyPolicyOptions)
opts, err := NewPolicyOptions(logger, topologyPolicyOptions)
if err != nil {
return nil, err
}
klog.InfoS("Creating topology manager with policy per scope", "topologyPolicyName", topologyPolicyName, "topologyScopeName", topologyScopeName, "topologyPolicyOptions", opts)
logger.Info("Creating topology manager with policy per scope", "topologyPolicyName", topologyPolicyName, "topologyScopeName", topologyScopeName, "topologyPolicyOptions", opts)
numaInfo, err := NewNUMAInfo(topology, opts)
if err != nil {
@@ -209,7 +214,7 @@ func (m *manager) GetPolicy() Policy {
return m.scope.GetPolicy()
}
func (m *manager) AddHintProvider(h HintProvider) {
func (m *manager) AddHintProvider(_ klog.Logger, h HintProvider) {
m.scope.AddHintProvider(h)
}
@@ -222,13 +227,17 @@ func (m *manager) RemoveContainer(containerID string) error {
}
func (m *manager) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
klog.V(4).InfoS("Topology manager admission check", "pod", klog.KObj(attrs.Pod))
// TODO: create context here as changing interface https://github.com/kubernetes/kubernetes/blob/09aaf7226056a7964adcb176d789de5507313d00/pkg/kubelet/lifecycle/interfaces.go#L43
// requires changes in too many other components
ctx := context.TODO()
logger := klog.FromContext(ctx)
logger.V(4).Info("Topology manager admission check", "pod", klog.KObj(attrs.Pod))
metrics.TopologyManagerAdmissionRequestsTotal.Inc()
startTime := time.Now()
podAdmitResult := m.scope.Admit(attrs.Pod)
podAdmitResult := m.scope.Admit(ctx, attrs.Pod)
metrics.TopologyManagerAdmissionDuration.Observe(float64(time.Since(startTime).Milliseconds()))
klog.V(4).InfoS("Pod Admit Result", "Message", podAdmitResult.Message, "pod", klog.KObj(attrs.Pod))
logger.V(4).Info("Pod Admit Result", "Message", podAdmitResult.Message, "pod", klog.KObj(attrs.Pod))
return podAdmitResult
}

View File

@@ -22,11 +22,13 @@ import (
"testing"
"k8s.io/api/core/v1"
"k8s.io/klog/v2"
cadvisorapi "github.com/google/cadvisor/info/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/test/utils/ktesting"
)
func NewTestBitMask(sockets ...int) bitmask.BitMask {
@@ -226,7 +228,7 @@ type mockPolicy struct {
ph []map[string][]TopologyHint
}
func (p *mockPolicy) Merge(providersHints []map[string][]TopologyHint) (TopologyHint, bool) {
func (p *mockPolicy) Merge(logger klog.Logger, providersHints []map[string][]TopologyHint) (TopologyHint, bool) {
p.ph = providersHints
return TopologyHint{}, true
}
@@ -247,9 +249,10 @@ func TestAddHintProvider(t *testing.T) {
}
mngr := manager{}
mngr.scope = NewContainerScope(NewNonePolicy())
logger, _ := ktesting.NewTestContext(t)
for _, tc := range tcases {
for _, hp := range tc.hp {
mngr.AddHintProvider(hp)
mngr.AddHintProvider(logger, hp)
}
if len(tc.hp) != len(mngr.scope.(*containerScope).hintProviders) {
t.Errorf("error")