move to cadvisor.MachineInfo

This patch removes GetNUMANodeInfo, cadvisor.MachineInfo will be used
instead of it. GetNUMANodeInfo was introduced due to difference of meaning of
MachineInfo.Topology. On the arm it was NUMA nodes, but on the x86 it
represents sockets (since reading from /proc/cpuinfo). Now it unified
and MachineInfo.Topology represents NUMA node.

Signed-off-by: Alexey Perevalov <alexey.perevalov@huawei.com>
This commit is contained in:
Alexey Perevalov 2020-05-10 19:14:48 +03:00
parent e33ba9e974
commit a047e8aa1b
12 changed files with 65 additions and 154 deletions

View File

@ -51,7 +51,6 @@ go_library(
"//pkg/apis/core/v1/helper/qos:go_default_library",
"//pkg/kubelet/cadvisor:go_default_library",
"//pkg/kubelet/cm/containermap:go_default_library",
"//pkg/kubelet/cm/cpumanager/topology:go_default_library",
"//pkg/kubelet/cm/devicemanager:go_default_library",
"//pkg/kubelet/cm/util:go_default_library",
"//pkg/kubelet/events:go_default_library",
@ -103,7 +102,6 @@ go_library(
"//pkg/apis/core/v1/helper/qos:go_default_library",
"//pkg/kubelet/cadvisor:go_default_library",
"//pkg/kubelet/cm/containermap:go_default_library",
"//pkg/kubelet/cm/cpumanager/topology:go_default_library",
"//pkg/kubelet/cm/devicemanager:go_default_library",
"//pkg/kubelet/cm/util:go_default_library",
"//pkg/kubelet/events:go_default_library",

View File

@ -52,7 +52,6 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
cputopology "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
cmutil "k8s.io/kubernetes/pkg/kubelet/cm/util"
@ -238,13 +237,6 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
if err != nil {
return nil, err
}
// Correct NUMA information is currently missing from cadvisor's
// MachineInfo struct, so we use the CPUManager's internal logic for
// gathering NUMANodeInfo to pass to components that care about it.
numaNodeInfo, err := cputopology.GetNUMANodeInfo()
if err != nil {
return nil, err
}
capacity := cadvisor.CapacityFromMachineInfo(machineInfo)
for k, v := range capacity {
internalCapacity[k] = v
@ -300,7 +292,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) {
cm.topologyManager, err = topologymanager.NewManager(
numaNodeInfo,
machineInfo.Topology,
nodeConfig.ExperimentalTopologyManagerPolicy,
)
@ -315,7 +307,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
klog.Infof("Creating device plugin manager: %t", devicePluginEnabled)
if devicePluginEnabled {
cm.deviceManager, err = devicemanager.NewManagerImpl(numaNodeInfo, cm.topologyManager)
cm.deviceManager, err = devicemanager.NewManagerImpl(machineInfo.Topology, cm.topologyManager)
cm.topologyManager.AddHintProvider(cm.deviceManager)
} else {
cm.deviceManager, err = devicemanager.NewManagerStub()
@ -330,7 +322,6 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
nodeConfig.ExperimentalCPUManagerPolicy,
nodeConfig.ExperimentalCPUManagerReconcilePeriod,
machineInfo,
numaNodeInfo,
nodeConfig.NodeAllocatableConfig.ReservedSystemCPUs,
cm.GetNodeAllocatableReservation(),
nodeConfig.KubeletRootDir,

View File

@ -126,7 +126,7 @@ func (s *sourcesReadyStub) AddSource(source string) {}
func (s *sourcesReadyStub) AllReady() bool { return true }
// NewManager creates new cpu manager based on provided policy
func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, numaNodeInfo topology.NUMANodeInfo, specificCPUs cpuset.CPUSet, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) {
func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, specificCPUs cpuset.CPUSet, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) {
var topo *topology.CPUTopology
var policy Policy
@ -137,7 +137,7 @@ func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo
case PolicyStatic:
var err error
topo, err = topology.Discover(machineInfo, numaNodeInfo)
topo, err = topology.Discover(machineInfo)
if err != nil {
return nil, err
}

View File

@ -629,7 +629,7 @@ func TestCPUManagerGenerate(t *testing.T) {
}
defer os.RemoveAll(sDir)
mgr, err := NewManager(testCase.cpuPolicyName, 5*time.Second, machineInfo, nil, cpuset.NewCPUSet(), testCase.nodeAllocatableReservation, sDir, topologymanager.NewFakeManager())
mgr, err := NewManager(testCase.cpuPolicyName, 5*time.Second, machineInfo, cpuset.NewCPUSet(), testCase.nodeAllocatableReservation, sDir, topologymanager.NewFakeManager())
if testCase.expectedError != nil {
if !strings.Contains(err.Error(), testCase.expectedError.Error()) {
t.Errorf("Unexpected error message. Have: %s wants %s", err.Error(), testCase.expectedError.Error())

View File

@ -33,8 +33,5 @@ go_test(
name = "go_default_test",
srcs = ["topology_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/kubelet/cm/cpuset:go_default_library",
"//vendor/github.com/google/cadvisor/info/v1:go_default_library",
],
deps = ["//vendor/github.com/google/cadvisor/info/v1:go_default_library"],
)

View File

@ -18,8 +18,6 @@ package topology
import (
"fmt"
"io/ioutil"
"strings"
cadvisorapi "github.com/google/cadvisor/info/v1"
"k8s.io/klog/v2"
@ -218,7 +216,7 @@ func (d CPUDetails) CPUsInCores(ids ...int) cpuset.CPUSet {
}
// Discover returns CPUTopology based on cadvisor node info
func Discover(machineInfo *cadvisorapi.MachineInfo, numaNodeInfo NUMANodeInfo) (*CPUTopology, error) {
func Discover(machineInfo *cadvisorapi.MachineInfo) (*CPUTopology, error) {
if machineInfo.NumCores == 0 {
return nil, fmt.Errorf("could not detect number of cpus")
}
@ -226,26 +224,20 @@ func Discover(machineInfo *cadvisorapi.MachineInfo, numaNodeInfo NUMANodeInfo) (
CPUDetails := CPUDetails{}
numPhysicalCores := 0
for _, socket := range machineInfo.Topology {
numPhysicalCores += len(socket.Cores)
for _, core := range socket.Cores {
for _, node := range machineInfo.Topology {
numPhysicalCores += len(node.Cores)
for _, core := range node.Cores {
if coreID, err := getUniqueCoreID(core.Threads); err == nil {
for _, cpu := range core.Threads {
numaNodeID := 0
for id, cset := range numaNodeInfo {
if cset.Contains(cpu) {
numaNodeID = id
}
}
CPUDetails[cpu] = CPUInfo{
CoreID: coreID,
SocketID: socket.Id,
NUMANodeID: numaNodeID,
SocketID: core.SocketID,
NUMANodeID: node.Id,
}
}
} else {
klog.Errorf("could not get unique coreID for socket: %d core %d threads: %v",
socket.Id, core.Id, core.Threads)
core.SocketID, core.Id, core.Threads)
return nil, err
}
}
@ -280,49 +272,3 @@ func getUniqueCoreID(threads []int) (coreID int, err error) {
return min, nil
}
// GetNUMANodeInfo uses sysfs to return a map of NUMANode id to the list of
// CPUs associated with that NUMANode.
//
// TODO: This is a temporary workaround until cadvisor provides this
// information directly in machineInfo. We should remove this once this
// information is available from cadvisor.
func GetNUMANodeInfo() (NUMANodeInfo, error) {
// Get the possible NUMA nodes on this machine. If reading this file
// is not possible, this is not an error. Instead, we just return a
// nil NUMANodeInfo, indicating that no NUMA information is available
// on this machine. This should implicitly be interpreted as having a
// single NUMA node with id 0 for all CPUs.
nodelist, err := ioutil.ReadFile("/sys/devices/system/node/online")
if err != nil {
return nil, nil
}
// Parse the nodelist into a set of Node IDs
nodes, err := cpuset.Parse(strings.TrimSpace(string(nodelist)))
if err != nil {
return nil, err
}
info := make(NUMANodeInfo)
// For each node...
for _, node := range nodes.ToSlice() {
// Read the 'cpulist' of the NUMA node from sysfs.
path := fmt.Sprintf("/sys/devices/system/node/node%d/cpulist", node)
cpulist, err := ioutil.ReadFile(path)
if err != nil {
return nil, err
}
// Convert the 'cpulist' into a set of CPUs.
cpus, err := cpuset.Parse(strings.TrimSpace(string(cpulist)))
if err != nil {
return nil, err
}
info[node] = cpus
}
return info, nil
}

View File

@ -21,26 +21,23 @@ import (
"testing"
cadvisorapi "github.com/google/cadvisor/info/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
)
func Test_Discover(t *testing.T) {
tests := []struct {
name string
machineInfo cadvisorapi.MachineInfo
numaNodeInfo NUMANodeInfo
want *CPUTopology
wantErr bool
name string
machineInfo cadvisorapi.MachineInfo
want *CPUTopology
wantErr bool
}{
{
name: "FailNumCores",
machineInfo: cadvisorapi.MachineInfo{
NumCores: 0,
},
numaNodeInfo: NUMANodeInfo{},
want: &CPUTopology{},
wantErr: true,
want: &CPUTopology{},
wantErr: true,
},
{
name: "OneSocketHT",
@ -49,17 +46,14 @@ func Test_Discover(t *testing.T) {
Topology: []cadvisorapi.Node{
{Id: 0,
Cores: []cadvisorapi.Core{
{Id: 0, Threads: []int{0, 4}},
{Id: 1, Threads: []int{1, 5}},
{Id: 2, Threads: []int{2, 6}},
{Id: 3, Threads: []int{3, 7}},
{SocketID: 0, Id: 0, Threads: []int{0, 4}},
{SocketID: 0, Id: 1, Threads: []int{1, 5}},
{SocketID: 0, Id: 2, Threads: []int{2, 6}},
{SocketID: 0, Id: 3, Threads: []int{3, 7}},
},
},
},
},
numaNodeInfo: NUMANodeInfo{
0: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
},
want: &CPUTopology{
NumCPUs: 8,
NumSockets: 1,
@ -84,22 +78,18 @@ func Test_Discover(t *testing.T) {
Topology: []cadvisorapi.Node{
{Id: 0,
Cores: []cadvisorapi.Core{
{Id: 0, Threads: []int{0}},
{Id: 2, Threads: []int{2}},
{SocketID: 0, Id: 0, Threads: []int{0}},
{SocketID: 0, Id: 2, Threads: []int{2}},
},
},
{Id: 1,
Cores: []cadvisorapi.Core{
{Id: 1, Threads: []int{1}},
{Id: 3, Threads: []int{3}},
{SocketID: 1, Id: 1, Threads: []int{1}},
{SocketID: 1, Id: 3, Threads: []int{3}},
},
},
},
},
numaNodeInfo: NUMANodeInfo{
0: cpuset.NewCPUSet(0, 2),
1: cpuset.NewCPUSet(1, 3),
},
want: &CPUTopology{
NumCPUs: 4,
NumSockets: 2,
@ -120,24 +110,20 @@ func Test_Discover(t *testing.T) {
Topology: []cadvisorapi.Node{
{Id: 0,
Cores: []cadvisorapi.Core{
{Id: 0, Threads: []int{0, 6}},
{Id: 1, Threads: []int{1, 7}},
{Id: 2, Threads: []int{2, 8}},
{SocketID: 0, Id: 0, Threads: []int{0, 6}},
{SocketID: 0, Id: 1, Threads: []int{1, 7}},
{SocketID: 0, Id: 2, Threads: []int{2, 8}},
},
},
{Id: 1,
Cores: []cadvisorapi.Core{
{Id: 0, Threads: []int{3, 9}},
{Id: 1, Threads: []int{4, 10}},
{Id: 2, Threads: []int{5, 11}},
{SocketID: 1, Id: 0, Threads: []int{3, 9}},
{SocketID: 1, Id: 1, Threads: []int{4, 10}},
{SocketID: 1, Id: 2, Threads: []int{5, 11}},
},
},
},
},
numaNodeInfo: NUMANodeInfo{
0: cpuset.NewCPUSet(0, 6, 1, 7, 2, 8),
1: cpuset.NewCPUSet(3, 9, 4, 10, 5, 11),
},
want: &CPUTopology{
NumCPUs: 12,
NumSockets: 2,
@ -166,17 +152,16 @@ func Test_Discover(t *testing.T) {
Topology: []cadvisorapi.Node{
{Id: 0,
Cores: []cadvisorapi.Core{
{Id: 0, Threads: []int{0, 4}},
{Id: 1, Threads: []int{1, 5}},
{Id: 2, Threads: []int{2, 2}}, // Wrong case - should fail here
{Id: 3, Threads: []int{3, 7}},
{SocketID: 0, Id: 0, Threads: []int{0, 4}},
{SocketID: 0, Id: 1, Threads: []int{1, 5}},
{SocketID: 0, Id: 2, Threads: []int{2, 2}}, // Wrong case - should fail here
{SocketID: 0, Id: 3, Threads: []int{3, 7}},
},
},
},
},
numaNodeInfo: NUMANodeInfo{},
want: &CPUTopology{},
wantErr: true,
want: &CPUTopology{},
wantErr: true,
},
{
name: "OneSocketHT fail",
@ -185,22 +170,21 @@ func Test_Discover(t *testing.T) {
Topology: []cadvisorapi.Node{
{Id: 0,
Cores: []cadvisorapi.Core{
{Id: 0, Threads: []int{0, 4}},
{Id: 1, Threads: []int{1, 5}},
{Id: 2, Threads: []int{2, 6}},
{Id: 3, Threads: []int{}}, // Wrong case - should fail here
{SocketID: 0, Id: 0, Threads: []int{0, 4}},
{SocketID: 0, Id: 1, Threads: []int{1, 5}},
{SocketID: 0, Id: 2, Threads: []int{2, 6}},
{SocketID: 0, Id: 3, Threads: []int{}}, // Wrong case - should fail here
},
},
},
},
numaNodeInfo: NUMANodeInfo{},
want: &CPUTopology{},
wantErr: true,
want: &CPUTopology{},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := Discover(&tt.machineInfo, tt.numaNodeInfo)
got, err := Discover(&tt.machineInfo)
if err != nil {
if tt.wantErr {
t.Logf("Discover() expected error = %v", err)

View File

@ -50,26 +50,21 @@ func TestGetTopologyHints(t *testing.T) {
Topology: []cadvisorapi.Node{
{Id: 0,
Cores: []cadvisorapi.Core{
{Id: 0, Threads: []int{0, 6}},
{Id: 1, Threads: []int{1, 7}},
{Id: 2, Threads: []int{2, 8}},
{SocketID: 0, Id: 0, Threads: []int{0, 6}},
{SocketID: 0, Id: 1, Threads: []int{1, 7}},
{SocketID: 0, Id: 2, Threads: []int{2, 8}},
},
},
{Id: 1,
Cores: []cadvisorapi.Core{
{Id: 0, Threads: []int{3, 9}},
{Id: 1, Threads: []int{4, 10}},
{Id: 2, Threads: []int{5, 11}},
{SocketID: 1, Id: 0, Threads: []int{3, 9}},
{SocketID: 1, Id: 1, Threads: []int{4, 10}},
{SocketID: 1, Id: 2, Threads: []int{5, 11}},
},
},
},
}
numaNodeInfo := topology.NUMANodeInfo{
0: cpuset.NewCPUSet(0, 6, 1, 7, 2, 8),
1: cpuset.NewCPUSet(3, 9, 4, 10, 5, 11),
}
tcases := []struct {
name string
pod v1.Pod
@ -237,7 +232,7 @@ func TestGetTopologyHints(t *testing.T) {
},
}
for _, tc := range tcases {
topology, _ := topology.Discover(&machineInfo, numaNodeInfo)
topology, _ := topology.Discover(&machineInfo)
var activePods []*v1.Pod
for p := range tc.assignments {

View File

@ -19,7 +19,6 @@ go_library(
"//pkg/kubelet/apis/podresources/v1alpha1:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/checkpointmanager/errors:go_default_library",
"//pkg/kubelet/cm/cpumanager/topology:go_default_library",
"//pkg/kubelet/cm/devicemanager/checkpoint:go_default_library",
"//pkg/kubelet/cm/topologymanager:go_default_library",
"//pkg/kubelet/cm/topologymanager/bitmask:go_default_library",
@ -37,6 +36,7 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1:go_default_library",
"//staging/src/k8s.io/kubelet/pkg/apis/pluginregistration/v1:go_default_library",
"//vendor/github.com/google/cadvisor/info/v1:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
],

View File

@ -26,6 +26,7 @@ import (
"sync"
"time"
cadvisorapi "github.com/google/cadvisor/info/v1"
"google.golang.org/grpc"
"k8s.io/klog/v2"
@ -40,7 +41,6 @@ import (
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
cputopology "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
@ -124,11 +124,11 @@ func (s *sourcesReadyStub) AddSource(source string) {}
func (s *sourcesReadyStub) AllReady() bool { return true }
// NewManagerImpl creates a new manager.
func NewManagerImpl(numaNodeInfo cputopology.NUMANodeInfo, topologyAffinityStore topologymanager.Store) (*ManagerImpl, error) {
return newManagerImpl(pluginapi.KubeletSocket, numaNodeInfo, topologyAffinityStore)
func NewManagerImpl(topology []cadvisorapi.Node, topologyAffinityStore topologymanager.Store) (*ManagerImpl, error) {
return newManagerImpl(pluginapi.KubeletSocket, topology, topologyAffinityStore)
}
func newManagerImpl(socketPath string, numaNodeInfo cputopology.NUMANodeInfo, topologyAffinityStore topologymanager.Store) (*ManagerImpl, error) {
func newManagerImpl(socketPath string, topology []cadvisorapi.Node, topologyAffinityStore topologymanager.Store) (*ManagerImpl, error) {
klog.V(2).Infof("Creating Device Plugin manager at %s", socketPath)
if socketPath == "" || !filepath.IsAbs(socketPath) {
@ -136,8 +136,8 @@ func newManagerImpl(socketPath string, numaNodeInfo cputopology.NUMANodeInfo, to
}
var numaNodes []int
for node := range numaNodeInfo {
numaNodes = append(numaNodes, node)
for _, node := range topology {
numaNodes = append(numaNodes, node.Id)
}
dir, file := filepath.Split(socketPath)

View File

@ -14,10 +14,10 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager",
visibility = ["//visibility:public"],
deps = [
"//pkg/kubelet/cm/cpumanager/topology:go_default_library",
"//pkg/kubelet/cm/topologymanager/bitmask:go_default_library",
"//pkg/kubelet/lifecycle:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//vendor/github.com/google/cadvisor/info/v1:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
],
)

View File

@ -20,9 +20,9 @@ import (
"fmt"
"sync"
cadvisorapi "github.com/google/cadvisor/info/v1"
"k8s.io/api/core/v1"
"k8s.io/klog/v2"
cputopology "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
)
@ -122,12 +122,12 @@ func (th *TopologyHint) LessThan(other TopologyHint) bool {
var _ Manager = &manager{}
//NewManager creates a new TopologyManager based on provided policy
func NewManager(numaNodeInfo cputopology.NUMANodeInfo, topologyPolicyName string) (Manager, error) {
func NewManager(topology []cadvisorapi.Node, topologyPolicyName string) (Manager, error) {
klog.Infof("[topologymanager] Creating topology manager with %s policy", topologyPolicyName)
var numaNodes []int
for node := range numaNodeInfo {
numaNodes = append(numaNodes, node)
for _, node := range topology {
numaNodes = append(numaNodes, node.Id)
}
if topologyPolicyName != PolicyNone && len(numaNodes) > maxAllowableNUMANodes {