mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 15:25:57 +00:00
Kubelet: per-pod workers should avoid grabbing the pod array lock
Per-pod worker syncs the pod and container status, and write the pod status in the pod status cache. Given that it already owns a copy of the pod, it can bypass the additional pod lookup step completely. This change adds a new generatePodStatusByPod() method to achieve this. In general, per-pod worker should avoid accessing the internal pod array completely, as this would may lead to high contention. This change also changes the return type of GetPodByFullName to reflect the name, and consolidates GetPodByFullName() and GetPodByName().
This commit is contained in:
parent
1cbde2c5c7
commit
0d0fb5f07b
@ -1245,12 +1245,12 @@ func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, hasMirrorPod bool, c
|
||||
result, err := kl.probeContainer(pod, podStatus, container, dockerContainer.ID, dockerContainer.Created)
|
||||
if err != nil {
|
||||
// TODO(vmarmol): examine this logic.
|
||||
glog.Infof("probe no-error: %s", container.Name)
|
||||
glog.Infof("probe no-error: %q", container.Name)
|
||||
containersToKeep[containerID] = index
|
||||
continue
|
||||
}
|
||||
if result == probe.Success {
|
||||
glog.Infof("probe success: %s", container.Name)
|
||||
glog.Infof("probe success: %q", container.Name)
|
||||
containersToKeep[containerID] = index
|
||||
continue
|
||||
}
|
||||
@ -1314,7 +1314,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, hasMirrorPod bool, containersInPod dock
|
||||
|
||||
// Before returning, regenerate status and store it in the cache.
|
||||
defer func() {
|
||||
status, err := kl.generatePodStatus(podFullName, uid)
|
||||
status, err := kl.generatePodStatusByPod(pod)
|
||||
if err != nil {
|
||||
glog.Errorf("Unable to generate status for pod with name %q and uid %q info with error(%v)", podFullName, uid, err)
|
||||
} else {
|
||||
@ -1467,7 +1467,7 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []api.Pod, running []*docker.Cont
|
||||
if _, ok := desiredVolumes[name]; !ok {
|
||||
parts := strings.Split(name, "/")
|
||||
if runningSet.Has(parts[0]) {
|
||||
glog.Infof("volume %s, still has a container running %s, skipping teardown", name, parts[0])
|
||||
glog.Infof("volume %q, still has a container running %q, skipping teardown", name, parts[0])
|
||||
continue
|
||||
}
|
||||
//TODO (jonesdl) We should somehow differentiate between volumes that are supposed
|
||||
@ -1767,9 +1767,9 @@ func (kl *Kubelet) syncStatus(deadline time.Duration) {
|
||||
}
|
||||
_, err = kl.kubeClient.Pods(pod.Namespace).UpdateStatus(pod.Name, &status)
|
||||
if err != nil {
|
||||
glog.Warningf("Error updating status for pod %s: %v (full pod: %s)", pod.Name, err, pod)
|
||||
glog.Warningf("Error updating status for pod %q: %v (full pod: %q)", pod.Name, err, pod)
|
||||
} else {
|
||||
glog.V(3).Infof("Status for pod %q updated successfully: %s", pod.Name, pod)
|
||||
glog.V(3).Infof("Status for pod %q updated successfully: %q", pod.Name, pod)
|
||||
}
|
||||
}
|
||||
t.Stop()
|
||||
@ -1887,7 +1887,16 @@ func (kl *Kubelet) GetPods() ([]api.Pod, util.StringSet) {
|
||||
return append([]api.Pod{}, kl.pods...), kl.mirrorPods
|
||||
}
|
||||
|
||||
// GetPodByName provides the first pod that matches namespace and name, as well as whether the node was found.
|
||||
func (kl *Kubelet) GetPodByFullName(podFullName string) (*api.Pod, bool) {
|
||||
name, namespace, err := ParsePodFullName(podFullName)
|
||||
if err != nil {
|
||||
return nil, false
|
||||
}
|
||||
return kl.GetPodByName(namespace, name)
|
||||
}
|
||||
|
||||
// GetPodByName provides the first pod that matches namespace and name, as well
|
||||
// as whether the pod was found.
|
||||
func (kl *Kubelet) GetPodByName(namespace, name string) (*api.Pod, bool) {
|
||||
kl.podLock.RLock()
|
||||
defer kl.podLock.RUnlock()
|
||||
@ -1917,10 +1926,10 @@ func (kl *Kubelet) updateNodeStatus() error {
|
||||
func (kl *Kubelet) tryUpdateNodeStatus() error {
|
||||
node, err := kl.kubeClient.Nodes().Get(kl.hostname)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting node %s: %v", kl.hostname, err)
|
||||
return fmt.Errorf("error getting node %q: %v", kl.hostname, err)
|
||||
}
|
||||
if node == nil {
|
||||
return fmt.Errorf("no node instance returned for %v", kl.hostname)
|
||||
return fmt.Errorf("no node instance returned for %q", kl.hostname)
|
||||
}
|
||||
|
||||
// TODO: Post NotReady if we cannot get MachineInfo from cAdvisor. This needs to start
|
||||
@ -2042,41 +2051,37 @@ func getPodReadyCondition(spec *api.PodSpec, info api.PodInfo) []api.PodConditio
|
||||
return ready
|
||||
}
|
||||
|
||||
func (kl *Kubelet) GetPodByFullName(podFullName string) (*api.PodSpec, bool) {
|
||||
kl.podLock.RLock()
|
||||
defer kl.podLock.RUnlock()
|
||||
for _, pod := range kl.pods {
|
||||
if GetPodFullName(&pod) == podFullName {
|
||||
return &pod.Spec, true
|
||||
}
|
||||
}
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// GetPodStatus returns information from Docker about the containers in a pod
|
||||
func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatus, error) {
|
||||
// Check to see if we have a cached version of the status.
|
||||
cachedPodStatus, found := kl.getPodStatusFromCache(podFullName)
|
||||
if found {
|
||||
glog.V(3).Infof("Returning cached status for %s", podFullName)
|
||||
glog.V(3).Infof("Returning cached status for %q", podFullName)
|
||||
return cachedPodStatus, nil
|
||||
}
|
||||
return kl.generatePodStatus(podFullName, uid)
|
||||
}
|
||||
|
||||
func (kl *Kubelet) generatePodStatus(podFullName string, uid types.UID) (api.PodStatus, error) {
|
||||
glog.V(3).Infof("Generating status for %s", podFullName)
|
||||
|
||||
spec, found := kl.GetPodByFullName(podFullName)
|
||||
pod, found := kl.GetPodByFullName(podFullName)
|
||||
if !found {
|
||||
return api.PodStatus{}, fmt.Errorf("Couldn't find spec for pod %s", podFullName)
|
||||
return api.PodStatus{}, fmt.Errorf("couldn't find pod %q", podFullName)
|
||||
}
|
||||
return kl.generatePodStatusByPod(pod)
|
||||
}
|
||||
|
||||
podStatus, err := dockertools.GetDockerPodStatus(kl.dockerClient, *spec, podFullName, uid)
|
||||
// By passing the pod directly, this method avoids pod lookup, which requires
|
||||
// grabbing a lock.
|
||||
func (kl *Kubelet) generatePodStatusByPod(pod *api.Pod) (api.PodStatus, error) {
|
||||
podFullName := GetPodFullName(pod)
|
||||
glog.V(3).Infof("Generating status for %q", podFullName)
|
||||
|
||||
spec := &pod.Spec
|
||||
podStatus, err := dockertools.GetDockerPodStatus(kl.dockerClient, *spec, podFullName, pod.UID)
|
||||
|
||||
if err != nil {
|
||||
// Error handling
|
||||
glog.Infof("Query docker container info for pod %s failed with error (%v)", podFullName, err)
|
||||
glog.Infof("Query docker container info for pod %q failed with error (%v)", podFullName, err)
|
||||
if strings.Contains(err.Error(), "resource temporarily unavailable") {
|
||||
// Leave upstream layer to decide what to do
|
||||
return api.PodStatus{}, err
|
||||
@ -2153,7 +2158,7 @@ func (kl *Kubelet) PortForward(podFullName string, uid types.UID, port uint16, s
|
||||
}
|
||||
podInfraContainer, found, _ := dockerContainers.FindPodContainer(podFullName, uid, dockertools.PodInfraContainerName)
|
||||
if !found {
|
||||
return fmt.Errorf("Unable to find pod infra container for pod %s, uid %v", podFullName, uid)
|
||||
return fmt.Errorf("Unable to find pod infra container for pod %q, uid %v", podFullName, uid)
|
||||
}
|
||||
return kl.runner.PortForward(podInfraContainer.ID, port, stream)
|
||||
}
|
||||
|
@ -77,6 +77,7 @@ func TestRunOnce(t *testing.T) {
|
||||
rootDirectory: "/tmp/kubelet",
|
||||
recorder: &record.FakeRecorder{},
|
||||
cadvisor: cadvisor,
|
||||
podStatuses: make(map[string]api.PodStatus),
|
||||
}
|
||||
|
||||
kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
|
||||
|
Loading…
Reference in New Issue
Block a user