mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 04:06:03 +00:00
Merge pull request #85225 from DataDog/eric.mountain/cleanup_refmanager_master
Removes container RefManager
This commit is contained in:
commit
99c50dfd3c
@ -5,7 +5,6 @@ go_library(
|
||||
srcs = [
|
||||
"cache.go",
|
||||
"container_gc.go",
|
||||
"container_reference_manager.go",
|
||||
"helpers.go",
|
||||
"os.go",
|
||||
"ref.go",
|
||||
|
@ -1,60 +0,0 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package container
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
)
|
||||
|
||||
// RefManager manages the references for the containers.
|
||||
// The references are used for reporting events such as creation,
|
||||
// failure, etc. This manager is thread-safe, no locks are necessary
|
||||
// for the caller.
|
||||
type RefManager struct {
|
||||
sync.RWMutex
|
||||
containerIDToRef map[ContainerID]*v1.ObjectReference
|
||||
}
|
||||
|
||||
// NewRefManager creates and returns a container reference manager
|
||||
// with empty contents.
|
||||
func NewRefManager() *RefManager {
|
||||
return &RefManager{containerIDToRef: make(map[ContainerID]*v1.ObjectReference)}
|
||||
}
|
||||
|
||||
// SetRef stores a reference to a pod's container, associating it with the given container ID.
|
||||
func (c *RefManager) SetRef(id ContainerID, ref *v1.ObjectReference) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
c.containerIDToRef[id] = ref
|
||||
}
|
||||
|
||||
// ClearRef forgets the given container id and its associated container reference.
|
||||
func (c *RefManager) ClearRef(id ContainerID) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
delete(c.containerIDToRef, id)
|
||||
}
|
||||
|
||||
// GetRef returns the container reference of the given ID, or (nil, false) if none is stored.
|
||||
func (c *RefManager) GetRef(id ContainerID) (ref *v1.ObjectReference, ok bool) {
|
||||
c.RLock()
|
||||
defer c.RUnlock()
|
||||
ref, ok = c.containerIDToRef[id]
|
||||
return ref, ok
|
||||
}
|
@ -461,8 +461,6 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
||||
Namespace: "",
|
||||
}
|
||||
|
||||
containerRefManager := kubecontainer.NewRefManager()
|
||||
|
||||
oomWatcher, err := oomwatcher.NewWatcher(kubeDeps.Recorder)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -608,7 +606,6 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
||||
klet.livenessManager,
|
||||
klet.startupManager,
|
||||
seccompProfileRoot,
|
||||
containerRefManager,
|
||||
machineInfo,
|
||||
klet,
|
||||
kubeDeps.OSInterface,
|
||||
@ -715,7 +712,6 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
||||
klet.livenessManager,
|
||||
klet.startupManager,
|
||||
klet.runner,
|
||||
containerRefManager,
|
||||
kubeDeps.Recorder)
|
||||
|
||||
tokenManager := token.NewManager(kubeDeps.KubeClient)
|
||||
|
@ -74,21 +74,20 @@ func (f *fakePodStateProvider) IsPodTerminated(uid types.UID) bool {
|
||||
func newFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageService internalapi.ImageManagerService, machineInfo *cadvisorapi.MachineInfo, osInterface kubecontainer.OSInterface, runtimeHelper kubecontainer.RuntimeHelper, keyring credentialprovider.DockerKeyring) (*kubeGenericRuntimeManager, error) {
|
||||
recorder := &record.FakeRecorder{}
|
||||
kubeRuntimeManager := &kubeGenericRuntimeManager{
|
||||
recorder: recorder,
|
||||
cpuCFSQuota: false,
|
||||
cpuCFSQuotaPeriod: metav1.Duration{Duration: time.Microsecond * 100},
|
||||
livenessManager: proberesults.NewManager(),
|
||||
startupManager: proberesults.NewManager(),
|
||||
containerRefManager: kubecontainer.NewRefManager(),
|
||||
machineInfo: machineInfo,
|
||||
osInterface: osInterface,
|
||||
runtimeHelper: runtimeHelper,
|
||||
runtimeService: runtimeService,
|
||||
imageService: imageService,
|
||||
keyring: keyring,
|
||||
seccompProfileRoot: fakeSeccompProfileRoot,
|
||||
internalLifecycle: cm.NewFakeInternalContainerLifecycle(),
|
||||
logReduction: logreduction.NewLogReduction(identicalErrorDelay),
|
||||
recorder: recorder,
|
||||
cpuCFSQuota: false,
|
||||
cpuCFSQuotaPeriod: metav1.Duration{Duration: time.Microsecond * 100},
|
||||
livenessManager: proberesults.NewManager(),
|
||||
startupManager: proberesults.NewManager(),
|
||||
machineInfo: machineInfo,
|
||||
osInterface: osInterface,
|
||||
runtimeHelper: runtimeHelper,
|
||||
runtimeService: runtimeService,
|
||||
imageService: imageService,
|
||||
keyring: keyring,
|
||||
seccompProfileRoot: fakeSeccompProfileRoot,
|
||||
internalLifecycle: cm.NewFakeInternalContainerLifecycle(),
|
||||
logReduction: logreduction.NewLogReduction(identicalErrorDelay),
|
||||
}
|
||||
|
||||
typedVersion, err := runtimeService.Version(kubeRuntimeAPIVersion)
|
||||
|
@ -143,12 +143,6 @@ func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandb
|
||||
}
|
||||
|
||||
// Step 2: create the container.
|
||||
ref, err := kubecontainer.GenerateContainerRef(pod, container)
|
||||
if err != nil {
|
||||
klog.Errorf("Can't make a ref to pod %q, container %v: %v", format.Pod(pod), container.Name, err)
|
||||
}
|
||||
klog.V(4).Infof("Generating ref for container %s: %#v", container.Name, ref)
|
||||
|
||||
// For a new container, the RestartCount should be 0
|
||||
restartCount := 0
|
||||
containerStatus := podStatus.FindContainerStatusByName(container.Name)
|
||||
@ -187,13 +181,6 @@ func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandb
|
||||
}
|
||||
m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.CreatedContainer, fmt.Sprintf("Created container %s", container.Name))
|
||||
|
||||
if ref != nil {
|
||||
m.containerRefManager.SetRef(kubecontainer.ContainerID{
|
||||
Type: m.runtimeName,
|
||||
ID: containerID,
|
||||
}, ref)
|
||||
}
|
||||
|
||||
// Step 3: start the container.
|
||||
err = m.runtimeService.StartContainer(containerID)
|
||||
if err != nil {
|
||||
@ -654,8 +641,6 @@ func (m *kubeGenericRuntimeManager) killContainer(pod *v1.Pod, containerID kubec
|
||||
klog.V(3).Infof("Container %q exited normally", containerID.String())
|
||||
}
|
||||
|
||||
m.containerRefManager.ClearRef(containerID)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
@ -719,13 +704,6 @@ func (m *kubeGenericRuntimeManager) pruneInitContainersBeforeStart(pod *v1.Pod,
|
||||
utilruntime.HandleError(fmt.Errorf("failed to remove pod init container %q: %v; Skipping pod %q", status.Name, err, format.Pod(pod)))
|
||||
continue
|
||||
}
|
||||
|
||||
// remove any references to this container
|
||||
if _, ok := m.containerRefManager.GetRef(status.ID); ok {
|
||||
m.containerRefManager.ClearRef(status.ID)
|
||||
} else {
|
||||
klog.Warningf("No ref for container %q", status.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -751,12 +729,6 @@ func (m *kubeGenericRuntimeManager) purgeInitContainers(pod *v1.Pod, podStatus *
|
||||
utilruntime.HandleError(fmt.Errorf("failed to remove pod init container %q: %v; Skipping pod %q", status.Name, err, format.Pod(pod)))
|
||||
continue
|
||||
}
|
||||
// Remove any references to this container
|
||||
if _, ok := m.containerRefManager.GetRef(status.ID); ok {
|
||||
m.containerRefManager.ClearRef(status.ID)
|
||||
} else {
|
||||
klog.Warningf("No ref for container %q", status.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -79,10 +79,9 @@ type podStateProvider interface {
|
||||
}
|
||||
|
||||
type kubeGenericRuntimeManager struct {
|
||||
runtimeName string
|
||||
recorder record.EventRecorder
|
||||
osInterface kubecontainer.OSInterface
|
||||
containerRefManager *kubecontainer.RefManager
|
||||
runtimeName string
|
||||
recorder record.EventRecorder
|
||||
osInterface kubecontainer.OSInterface
|
||||
|
||||
// machineInfo contains the machine information.
|
||||
machineInfo *cadvisorapi.MachineInfo
|
||||
@ -154,7 +153,6 @@ func NewKubeGenericRuntimeManager(
|
||||
livenessManager proberesults.Manager,
|
||||
startupManager proberesults.Manager,
|
||||
seccompProfileRoot string,
|
||||
containerRefManager *kubecontainer.RefManager,
|
||||
machineInfo *cadvisorapi.MachineInfo,
|
||||
podStateProvider podStateProvider,
|
||||
osInterface kubecontainer.OSInterface,
|
||||
@ -179,7 +177,6 @@ func NewKubeGenericRuntimeManager(
|
||||
seccompProfileRoot: seccompProfileRoot,
|
||||
livenessManager: livenessManager,
|
||||
startupManager: startupManager,
|
||||
containerRefManager: containerRefManager,
|
||||
machineInfo: machineInfo,
|
||||
osInterface: osInterface,
|
||||
runtimeHelper: runtimeHelper,
|
||||
|
@ -20,7 +20,7 @@ import (
|
||||
"reflect"
|
||||
"sync"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/client-go/tools/record"
|
||||
@ -104,8 +104,6 @@ func setTestProbe(pod *v1.Pod, probeType probeType, probeSpec v1.Probe) {
|
||||
}
|
||||
|
||||
func newTestManager() *manager {
|
||||
refManager := kubecontainer.NewRefManager()
|
||||
refManager.SetRef(testContainerID, &v1.ObjectReference{}) // Suppress prober warnings.
|
||||
podManager := kubepod.NewBasicPodManager(nil, nil, nil, nil)
|
||||
// Add test pod to pod manager, so that status manager can get the pod from pod manager if needed.
|
||||
podManager.AddPod(getTestPod())
|
||||
@ -114,7 +112,6 @@ func newTestManager() *manager {
|
||||
results.NewManager(),
|
||||
results.NewManager(),
|
||||
nil, // runner
|
||||
refManager,
|
||||
&record.FakeRecorder{},
|
||||
).(*manager)
|
||||
// Don't actually execute probes.
|
||||
|
@ -56,15 +56,13 @@ type prober struct {
|
||||
tcp tcpprobe.Prober
|
||||
runner kubecontainer.ContainerCommandRunner
|
||||
|
||||
refManager *kubecontainer.RefManager
|
||||
recorder record.EventRecorder
|
||||
recorder record.EventRecorder
|
||||
}
|
||||
|
||||
// NewProber creates a Prober, it takes a command runner and
|
||||
// several container info managers.
|
||||
func newProber(
|
||||
runner kubecontainer.ContainerCommandRunner,
|
||||
refManager *kubecontainer.RefManager,
|
||||
recorder record.EventRecorder) *prober {
|
||||
|
||||
const followNonLocalRedirects = false
|
||||
@ -75,21 +73,16 @@ func newProber(
|
||||
startupHTTP: httpprobe.New(followNonLocalRedirects),
|
||||
tcp: tcpprobe.New(),
|
||||
runner: runner,
|
||||
refManager: refManager,
|
||||
recorder: recorder,
|
||||
}
|
||||
}
|
||||
|
||||
// recordContainerEvent should be used by the prober for all container related events.
|
||||
func (pb *prober) recordContainerEvent(pod *v1.Pod, container *v1.Container, containerID kubecontainer.ContainerID, eventType, reason, message string, args ...interface{}) {
|
||||
var err error
|
||||
ref, hasRef := pb.refManager.GetRef(containerID)
|
||||
if !hasRef {
|
||||
ref, err = kubecontainer.GenerateContainerRef(pod, container)
|
||||
if err != nil {
|
||||
klog.Errorf("Can't make a ref to pod %q, container %v: %v", format.Pod(pod), container.Name, err)
|
||||
return
|
||||
}
|
||||
func (pb *prober) recordContainerEvent(pod *v1.Pod, container *v1.Container, eventType, reason, message string, args ...interface{}) {
|
||||
ref, err := kubecontainer.GenerateContainerRef(pod, container)
|
||||
if err != nil {
|
||||
klog.Errorf("Can't make a ref to pod %q, container %v: %v", format.Pod(pod), container.Name, err)
|
||||
return
|
||||
}
|
||||
pb.recorder.Eventf(ref, eventType, reason, message, args...)
|
||||
}
|
||||
@ -119,15 +112,15 @@ func (pb *prober) probe(probeType probeType, pod *v1.Pod, status v1.PodStatus, c
|
||||
// Probe failed in one way or another.
|
||||
if err != nil {
|
||||
klog.V(1).Infof("%s probe for %q errored: %v", probeType, ctrName, err)
|
||||
pb.recordContainerEvent(pod, &container, containerID, v1.EventTypeWarning, events.ContainerUnhealthy, "%s probe errored: %v", probeType, err)
|
||||
pb.recordContainerEvent(pod, &container, v1.EventTypeWarning, events.ContainerUnhealthy, "%s probe errored: %v", probeType, err)
|
||||
} else { // result != probe.Success
|
||||
klog.V(1).Infof("%s probe for %q failed (%v): %s", probeType, ctrName, result, output)
|
||||
pb.recordContainerEvent(pod, &container, containerID, v1.EventTypeWarning, events.ContainerUnhealthy, "%s probe failed: %v", probeType, output)
|
||||
pb.recordContainerEvent(pod, &container, v1.EventTypeWarning, events.ContainerUnhealthy, "%s probe failed: %s", probeType, output)
|
||||
}
|
||||
return results.Failure, err
|
||||
}
|
||||
if result == probe.Warning {
|
||||
pb.recordContainerEvent(pod, &container, containerID, v1.EventTypeWarning, events.ContainerProbeWarning, "%s probe warning: %v", probeType, output)
|
||||
pb.recordContainerEvent(pod, &container, v1.EventTypeWarning, events.ContainerProbeWarning, "%s probe warning: %s", probeType, output)
|
||||
klog.V(3).Infof("%s probe for %q succeeded with a warning: %s", probeType, ctrName, output)
|
||||
} else {
|
||||
klog.V(3).Infof("%s probe for %q succeeded", probeType, ctrName)
|
||||
|
@ -104,10 +104,9 @@ func NewManager(
|
||||
livenessManager results.Manager,
|
||||
startupManager results.Manager,
|
||||
runner kubecontainer.ContainerCommandRunner,
|
||||
refManager *kubecontainer.RefManager,
|
||||
recorder record.EventRecorder) Manager {
|
||||
|
||||
prober := newProber(runner, refManager, recorder)
|
||||
prober := newProber(runner, recorder)
|
||||
readinessManager := results.NewManager()
|
||||
return &manager{
|
||||
statusManager: statusManager,
|
||||
|
@ -25,7 +25,7 @@ import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/client-go/tools/record"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
@ -285,8 +285,7 @@ func TestProbe(t *testing.T) {
|
||||
for i, test := range tests {
|
||||
for _, probeType := range [...]probeType{liveness, readiness, startup} {
|
||||
prober := &prober{
|
||||
refManager: kubecontainer.NewRefManager(),
|
||||
recorder: &record.FakeRecorder{},
|
||||
recorder: &record.FakeRecorder{},
|
||||
}
|
||||
testID := fmt.Sprintf("%d-%s", i, probeType)
|
||||
testContainer := v1.Container{Env: test.env}
|
||||
|
@ -21,13 +21,12 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/client-go/tools/record"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
|
||||
"k8s.io/kubernetes/pkg/kubelet/prober/results"
|
||||
"k8s.io/kubernetes/pkg/kubelet/status"
|
||||
@ -275,9 +274,8 @@ func TestHandleCrash(t *testing.T) {
|
||||
|
||||
// Prober starts crashing.
|
||||
m.prober = &prober{
|
||||
refManager: kubecontainer.NewRefManager(),
|
||||
recorder: &record.FakeRecorder{},
|
||||
exec: crashingExecProber{},
|
||||
recorder: &record.FakeRecorder{},
|
||||
exec: crashingExecProber{},
|
||||
}
|
||||
|
||||
// doProbe should recover from the crash, and keep going.
|
||||
|
Loading…
Reference in New Issue
Block a user