mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 12:15:52 +00:00
Merge pull request #100176 from pacoxu/structured-log-kubelet-last
Kubelet migration to structured logs: cpumanager/{cpu_manager.go\fake_cpu_manager.go\policy_static.go)
This commit is contained in:
commit
1cd909606d
@ -155,7 +155,7 @@ func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
klog.Infof("[cpumanager] detected CPU topology: %v", topo)
|
||||
klog.InfoS("Detected CPU topology", "topology", topo)
|
||||
|
||||
reservedCPUs, ok := nodeAllocatableReservation[v1.ResourceCPU]
|
||||
if !ok {
|
||||
@ -196,8 +196,8 @@ func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo
|
||||
}
|
||||
|
||||
func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error {
|
||||
klog.Infof("[cpumanager] starting with %s policy", m.policy.Name())
|
||||
klog.Infof("[cpumanager] reconciling every %v", m.reconcilePeriod)
|
||||
klog.InfoS("Starting CPU manager", "policy", m.policy.Name())
|
||||
klog.InfoS("Reconciling", "reconcilePeriod", m.reconcilePeriod)
|
||||
m.sourcesReady = sourcesReady
|
||||
m.activePods = activePods
|
||||
m.podStatusProvider = podStatusProvider
|
||||
@ -206,14 +206,14 @@ func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesRe
|
||||
|
||||
stateImpl, err := state.NewCheckpointState(m.stateFileDirectory, cpuManagerStateFileName, m.policy.Name(), m.containerMap)
|
||||
if err != nil {
|
||||
klog.Errorf("[cpumanager] could not initialize checkpoint manager: %v, please drain node and remove policy state file", err)
|
||||
klog.ErrorS(err, "Could not initialize checkpoint manager, please drain node and remove policy state file")
|
||||
return err
|
||||
}
|
||||
m.state = stateImpl
|
||||
|
||||
err = m.policy.Start(m.state)
|
||||
if err != nil {
|
||||
klog.Errorf("[cpumanager] policy start error: %v", err)
|
||||
klog.ErrorS(err, "Policy start error")
|
||||
return err
|
||||
}
|
||||
|
||||
@ -238,7 +238,7 @@ func (m *manager) Allocate(p *v1.Pod, c *v1.Container) error {
|
||||
// Call down into the policy to assign this container CPUs if required.
|
||||
err := m.policy.Allocate(m.state, p, c)
|
||||
if err != nil {
|
||||
klog.Errorf("[cpumanager] Allocate error: %v", err)
|
||||
klog.ErrorS(err, "Allocate error")
|
||||
return err
|
||||
}
|
||||
|
||||
@ -257,7 +257,7 @@ func (m *manager) RemoveContainer(containerID string) error {
|
||||
|
||||
err := m.policyRemoveContainerByID(containerID)
|
||||
if err != nil {
|
||||
klog.Errorf("[cpumanager] RemoveContainer error: %v", err)
|
||||
klog.ErrorS(err, "RemoveContainer error")
|
||||
return err
|
||||
}
|
||||
|
||||
@ -348,10 +348,10 @@ func (m *manager) removeStaleState() {
|
||||
for podUID := range assignments {
|
||||
for containerName := range assignments[podUID] {
|
||||
if _, ok := activeContainers[podUID][containerName]; !ok {
|
||||
klog.Errorf("[cpumanager] removeStaleState: removing (pod %s, container: %s)", podUID, containerName)
|
||||
klog.ErrorS(nil, "RemoveStaleState: removing container", "podUID", podUID, "containerName", containerName)
|
||||
err := m.policyRemoveContainerByRef(podUID, containerName)
|
||||
if err != nil {
|
||||
klog.Errorf("[cpumanager] removeStaleState: failed to remove (pod %s, container %s), error: %v)", podUID, containerName, err)
|
||||
klog.ErrorS(err, "RemoveStaleState: failed to remove container", "podUID", podUID, "containerName", containerName)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -366,7 +366,7 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec
|
||||
for _, pod := range m.activePods() {
|
||||
pstatus, ok := m.podStatusProvider.GetPodStatus(pod.UID)
|
||||
if !ok {
|
||||
klog.Warningf("[cpumanager] reconcileState: skipping pod; status not found (pod: %s)", pod.Name)
|
||||
klog.InfoS("ReconcileState: skipping pod; status not found", "pod", klog.KObj(pod))
|
||||
failure = append(failure, reconciledContainer{pod.Name, "", ""})
|
||||
continue
|
||||
}
|
||||
@ -376,21 +376,21 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec
|
||||
for _, container := range allContainers {
|
||||
containerID, err := findContainerIDByName(&pstatus, container.Name)
|
||||
if err != nil {
|
||||
klog.Warningf("[cpumanager] reconcileState: skipping container; ID not found in pod status (pod: %s, container: %s, error: %v)", pod.Name, container.Name, err)
|
||||
klog.InfoS("ReconcileState: skipping container; ID not found in pod status", "pod", klog.KObj(pod), "containerName", container.Name, "err", err)
|
||||
failure = append(failure, reconciledContainer{pod.Name, container.Name, ""})
|
||||
continue
|
||||
}
|
||||
|
||||
cstatus, err := findContainerStatusByName(&pstatus, container.Name)
|
||||
if err != nil {
|
||||
klog.Warningf("[cpumanager] reconcileState: skipping container; container status not found in pod status (pod: %s, container: %s, error: %v)", pod.Name, container.Name, err)
|
||||
klog.InfoS("ReconcileState: skipping container; container status not found in pod status", "pod", klog.KObj(pod), "containerName", container.Name, "err", err)
|
||||
failure = append(failure, reconciledContainer{pod.Name, container.Name, ""})
|
||||
continue
|
||||
}
|
||||
|
||||
if cstatus.State.Waiting != nil ||
|
||||
(cstatus.State.Waiting == nil && cstatus.State.Running == nil && cstatus.State.Terminated == nil) {
|
||||
klog.Warningf("[cpumanager] reconcileState: skipping container; container still in the waiting state (pod: %s, container: %s, error: %v)", pod.Name, container.Name, err)
|
||||
klog.InfoS("ReconcileState: skipping container; container still in the waiting state", "pod", klog.KObj(pod), "containerName", container.Name, "err", err)
|
||||
failure = append(failure, reconciledContainer{pod.Name, container.Name, ""})
|
||||
continue
|
||||
}
|
||||
@ -404,7 +404,7 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec
|
||||
// was allocated.
|
||||
_, _, err := m.containerMap.GetContainerRef(containerID)
|
||||
if err == nil {
|
||||
klog.Warningf("[cpumanager] reconcileState: ignoring terminated container (pod: %s, container id: %s)", pod.Name, containerID)
|
||||
klog.InfoS("ReconcileState: ignoring terminated container", "pod", klog.KObj(pod), "containerID", containerID)
|
||||
}
|
||||
m.Unlock()
|
||||
continue
|
||||
@ -419,15 +419,15 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec
|
||||
cset := m.state.GetCPUSetOrDefault(string(pod.UID), container.Name)
|
||||
if cset.IsEmpty() {
|
||||
// NOTE: This should not happen outside of tests.
|
||||
klog.Infof("[cpumanager] reconcileState: skipping container; assigned cpuset is empty (pod: %s, container: %s)", pod.Name, container.Name)
|
||||
klog.InfoS("ReconcileState: skipping container; assigned cpuset is empty", "pod", klog.KObj(pod), "containerName", container.Name)
|
||||
failure = append(failure, reconciledContainer{pod.Name, container.Name, containerID})
|
||||
continue
|
||||
}
|
||||
|
||||
klog.V(4).Infof("[cpumanager] reconcileState: updating container (pod: %s, container: %s, container id: %s, cpuset: \"%v\")", pod.Name, container.Name, containerID, cset)
|
||||
klog.V(4).InfoS("ReconcileState: updating container", "pod", klog.KObj(pod), "containerName", container.Name, "containerID", containerID, "cpuSet", cset)
|
||||
err = m.updateContainerCPUSet(containerID, cset)
|
||||
if err != nil {
|
||||
klog.Errorf("[cpumanager] reconcileState: failed to update container (pod: %s, container: %s, container id: %s, cpuset: \"%v\", error: %v)", pod.Name, container.Name, containerID, cset, err)
|
||||
klog.ErrorS(err, "ReconcileState: failed to update container", "pod", klog.KObj(pod), "containerName", container.Name, "containerID", containerID, "cpuSet", cset)
|
||||
failure = append(failure, reconciledContainer{pod.Name, container.Name, containerID})
|
||||
continue
|
||||
}
|
||||
|
@ -32,36 +32,36 @@ type fakeManager struct {
|
||||
}
|
||||
|
||||
func (m *fakeManager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error {
|
||||
klog.Info("[fake cpumanager] Start()")
|
||||
klog.Info("Start()")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *fakeManager) Policy() Policy {
|
||||
klog.Info("[fake cpumanager] Policy()")
|
||||
klog.InfoS("Policy()")
|
||||
return NewNonePolicy()
|
||||
}
|
||||
|
||||
func (m *fakeManager) Allocate(pod *v1.Pod, container *v1.Container) error {
|
||||
klog.Infof("[fake cpumanager] Allocate (pod: %s, container: %s", pod.Name, container.Name)
|
||||
klog.InfoS("Allocate", "pod", klog.KObj(pod), "containerName", container.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *fakeManager) AddContainer(pod *v1.Pod, container *v1.Container, containerID string) {
|
||||
klog.Infof("[fake cpumanager] AddContainer (pod: %s, container: %s, container id: %s)", pod.Name, container.Name, containerID)
|
||||
klog.InfoS("AddContainer", "pod", klog.KObj(pod), "containerName", container.Name, "containerID", containerID)
|
||||
}
|
||||
|
||||
func (m *fakeManager) RemoveContainer(containerID string) error {
|
||||
klog.Infof("[fake cpumanager] RemoveContainer (container id: %s)", containerID)
|
||||
klog.InfoS("RemoveContainer", "containerID", containerID)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *fakeManager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
|
||||
klog.Infof("[fake cpumanager] Get Container Topology Hints")
|
||||
klog.InfoS("Get container topology hints")
|
||||
return map[string][]topologymanager.TopologyHint{}
|
||||
}
|
||||
|
||||
func (m *fakeManager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint {
|
||||
klog.Infof("[fake cpumanager] Get Pod Topology Hints")
|
||||
klog.InfoS("Get pod topology hints")
|
||||
return map[string][]topologymanager.TopologyHint{}
|
||||
}
|
||||
|
||||
@ -70,12 +70,12 @@ func (m *fakeManager) State() state.Reader {
|
||||
}
|
||||
|
||||
func (m *fakeManager) GetCPUs(podUID, containerName string) cpuset.CPUSet {
|
||||
klog.Infof("[fake cpumanager] GetCPUs(podUID: %s, containerName: %s)", podUID, containerName)
|
||||
klog.InfoS("GetCPUs", "podUID", podUID, "containerName", containerName)
|
||||
return cpuset.CPUSet{}
|
||||
}
|
||||
|
||||
func (m *fakeManager) GetAllocatableCPUs() cpuset.CPUSet {
|
||||
klog.Infof("[fake cpumanager] Get Allocatable Cpus")
|
||||
klog.InfoS("Get Allocatable CPUs")
|
||||
return cpuset.CPUSet{}
|
||||
}
|
||||
|
||||
|
@ -27,7 +27,6 @@ import (
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util/format"
|
||||
)
|
||||
|
||||
// PolicyStatic is the name of the static policy
|
||||
@ -107,7 +106,7 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reserv
|
||||
return nil, err
|
||||
}
|
||||
|
||||
klog.Infof("[cpumanager] reserved %d CPUs (\"%s\") not available for exclusive assignment", reserved.Size(), reserved)
|
||||
klog.InfoS("Reserved CPUs not available for exclusive assignment", "reservedSize", reserved.Size(), "reserved", reserved)
|
||||
|
||||
return &staticPolicy{
|
||||
topology: topology,
|
||||
@ -123,7 +122,7 @@ func (p *staticPolicy) Name() string {
|
||||
|
||||
func (p *staticPolicy) Start(s state.State) error {
|
||||
if err := p.validateState(s); err != nil {
|
||||
klog.Errorf("[cpumanager] static policy invalid state: %v, please drain node and remove policy state file", err)
|
||||
klog.ErrorS(err, "Static policy invalid state, please drain node and remove policy state file")
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
@ -218,23 +217,23 @@ func (p *staticPolicy) updateCPUsToReuse(pod *v1.Pod, container *v1.Container, c
|
||||
|
||||
func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error {
|
||||
if numCPUs := p.guaranteedCPUs(pod, container); numCPUs != 0 {
|
||||
klog.Infof("[cpumanager] static policy: Allocate (pod: %s, container: %s)", format.Pod(pod), container.Name)
|
||||
klog.InfoS("Static policy: Allocate", "pod", klog.KObj(pod), "containerName", container.Name)
|
||||
// container belongs in an exclusively allocated pool
|
||||
|
||||
if cpuset, ok := s.GetCPUSet(string(pod.UID), container.Name); ok {
|
||||
p.updateCPUsToReuse(pod, container, cpuset)
|
||||
klog.Infof("[cpumanager] static policy: container already present in state, skipping (pod: %s, container: %s)", format.Pod(pod), container.Name)
|
||||
klog.InfoS("Static policy: container already present in state, skipping", "pod", klog.KObj(pod), "containerName", container.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Call Topology Manager to get the aligned socket affinity across all hint providers.
|
||||
hint := p.affinity.GetAffinity(string(pod.UID), container.Name)
|
||||
klog.Infof("[cpumanager] Pod %v, Container %v Topology Affinity is: %v", format.Pod(pod), container.Name, hint)
|
||||
klog.InfoS("Topology Affinity", "pod", klog.KObj(pod), "containerName", container.Name, "affinity", hint)
|
||||
|
||||
// Allocate CPUs according to the NUMA affinity contained in the hint.
|
||||
cpuset, err := p.allocateCPUs(s, numCPUs, hint.NUMANodeAffinity, p.cpusToReuse[string(pod.UID)])
|
||||
if err != nil {
|
||||
klog.Errorf("[cpumanager] unable to allocate %d CPUs (pod: %s, container: %s, error: %v)", numCPUs, format.Pod(pod), container.Name, err)
|
||||
klog.ErrorS(err, "Unable to allocate CPUs", "pod", klog.KObj(pod), "containerName", container.Name, "numCPUs", numCPUs)
|
||||
return err
|
||||
}
|
||||
s.SetCPUSet(string(pod.UID), container.Name, cpuset)
|
||||
@ -246,7 +245,7 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai
|
||||
}
|
||||
|
||||
func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerName string) error {
|
||||
klog.Infof("[cpumanager] static policy: RemoveContainer (pod: %s, container: %s)", podUID, containerName)
|
||||
klog.InfoS("Static policy: RemoveContainer", "podUID", podUID, "containerName", containerName)
|
||||
if toRelease, ok := s.GetCPUSet(podUID, containerName); ok {
|
||||
s.Delete(podUID, containerName)
|
||||
// Mutate the shared pool, adding released cpus.
|
||||
@ -256,7 +255,7 @@ func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerNa
|
||||
}
|
||||
|
||||
func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bitmask.BitMask, reusableCPUs cpuset.CPUSet) (cpuset.CPUSet, error) {
|
||||
klog.Infof("[cpumanager] allocateCpus: (numCPUs: %d, socket: %v)", numCPUs, numaAffinity)
|
||||
klog.InfoS("AllocateCPUs", "numCPUs", numCPUs, "socket", numaAffinity)
|
||||
|
||||
allocatableCPUs := p.GetAllocatableCPUs(s).Union(reusableCPUs)
|
||||
|
||||
@ -291,7 +290,7 @@ func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bit
|
||||
// Remove allocated CPUs from the shared CPUSet.
|
||||
s.SetDefaultCPUSet(s.GetDefaultCPUSet().Difference(result))
|
||||
|
||||
klog.Infof("[cpumanager] allocateCPUs: returning \"%v\"", result)
|
||||
klog.InfoS("AllocateCPUs", "result", result)
|
||||
return result, nil
|
||||
}
|
||||
|
||||
@ -353,7 +352,7 @@ func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v
|
||||
// kubelet restart, for example.
|
||||
if allocated, exists := s.GetCPUSet(string(pod.UID), container.Name); exists {
|
||||
if allocated.Size() != requested {
|
||||
klog.Errorf("[cpumanager] CPUs already allocated to (pod %v, container %v) with different number than request: requested: %d, allocated: %d", format.Pod(pod), container.Name, requested, allocated.Size())
|
||||
klog.ErrorS(nil, "CPUs already allocated to container with different number than request", "pod", klog.KObj(pod), "containerName", container.Name, "requestedSize", requested, "allocatedSize", allocated.Size())
|
||||
// An empty list of hints will be treated as a preference that cannot be satisfied.
|
||||
// In definition of hints this is equal to: TopologyHint[NUMANodeAffinity: nil, Preferred: false].
|
||||
// For all but the best-effort policy, the Topology Manager will throw a pod-admission error.
|
||||
@ -361,7 +360,7 @@ func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v
|
||||
string(v1.ResourceCPU): {},
|
||||
}
|
||||
}
|
||||
klog.Infof("[cpumanager] Regenerating TopologyHints for CPUs already allocated to (pod %v, container %v)", format.Pod(pod), container.Name)
|
||||
klog.InfoS("Regenerating TopologyHints for CPUs already allocated", "pod", klog.KObj(pod), "containerName", container.Name)
|
||||
return map[string][]topologymanager.TopologyHint{
|
||||
string(v1.ResourceCPU): p.generateCPUTopologyHints(allocated, cpuset.CPUSet{}, requested),
|
||||
}
|
||||
@ -376,7 +375,7 @@ func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v
|
||||
|
||||
// Generate hints.
|
||||
cpuHints := p.generateCPUTopologyHints(available, reusable, requested)
|
||||
klog.Infof("[cpumanager] TopologyHints generated for pod '%v', container '%v': %v", format.Pod(pod), container.Name, cpuHints)
|
||||
klog.InfoS("TopologyHints generated", "pod", klog.KObj(pod), "containerName", container.Name, "cpuHints", cpuHints)
|
||||
|
||||
return map[string][]topologymanager.TopologyHint{
|
||||
string(v1.ResourceCPU): cpuHints,
|
||||
@ -403,7 +402,7 @@ func (p *staticPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[strin
|
||||
// kubelet restart, for example.
|
||||
if allocated, exists := s.GetCPUSet(string(pod.UID), container.Name); exists {
|
||||
if allocated.Size() != requestedByContainer {
|
||||
klog.Errorf("[cpumanager] CPUs already allocated to (pod %v, container %v) with different number than request: requested: %d, allocated: %d", format.Pod(pod), container.Name, requestedByContainer, allocated.Size())
|
||||
klog.ErrorS(nil, "CPUs already allocated to container with different number than request", "pod", klog.KObj(pod), "containerName", container.Name, "allocatedSize", requested, "requestedByContainer", requestedByContainer, "allocatedSize", allocated.Size())
|
||||
// An empty list of hints will be treated as a preference that cannot be satisfied.
|
||||
// In definition of hints this is equal to: TopologyHint[NUMANodeAffinity: nil, Preferred: false].
|
||||
// For all but the best-effort policy, the Topology Manager will throw a pod-admission error.
|
||||
@ -416,7 +415,7 @@ func (p *staticPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[strin
|
||||
}
|
||||
}
|
||||
if assignedCPUs.Size() == requested {
|
||||
klog.Infof("[cpumanager] Regenerating TopologyHints for CPUs already allocated to pod %v", format.Pod(pod))
|
||||
klog.InfoS("Regenerating TopologyHints for CPUs already allocated", "pod", klog.KObj(pod))
|
||||
return map[string][]topologymanager.TopologyHint{
|
||||
string(v1.ResourceCPU): p.generateCPUTopologyHints(assignedCPUs, cpuset.CPUSet{}, requested),
|
||||
}
|
||||
@ -434,7 +433,7 @@ func (p *staticPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[strin
|
||||
|
||||
// Generate hints.
|
||||
cpuHints := p.generateCPUTopologyHints(available, reusable, requested)
|
||||
klog.Infof("[cpumanager] TopologyHints generated for pod '%v' : %v", format.Pod(pod), cpuHints)
|
||||
klog.InfoS("TopologyHints generated", "pod", klog.KObj(pod), "cpuHints", cpuHints)
|
||||
|
||||
return map[string][]topologymanager.TopologyHint{
|
||||
string(v1.ResourceCPU): cpuHints,
|
||||
|
Loading…
Reference in New Issue
Block a user