mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-04 09:49:50 +00:00
Refactor kubelet syncPod method
This commit is contained in:
parent
39dceb13a5
commit
3489d1ae01
@ -1183,56 +1183,184 @@ func (kl *Kubelet) pullImageAndRunContainer(pod *api.BoundPod, container *api.Co
|
|||||||
return containerID, nil
|
return containerID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (kl *Kubelet) syncPod(pod *api.BoundPod, containersInPod dockertools.DockerContainers) error {
|
// Structure keeping information on changes that need to happen for a pod. The semantics is as follows:
|
||||||
|
// - startInfraContainer is true if new Infra Containers have to be started and old one (if running) killed.
|
||||||
|
// Additionally if it is true then containersToKeep have to be empty
|
||||||
|
// - infraContainerId have to be set iff startInfraContainer is false. It stores dockerID of running Infra Container
|
||||||
|
// - containersToStart keeps indices of Specs of containers that have to be started.
|
||||||
|
// - containersToKeep stores mapping from dockerIDs of running containers to indices of their Specs for containers that
|
||||||
|
// should be kept running. If startInfraContainer is false then it contains an entry for infraContainerId (mapped to -1).
|
||||||
|
// It shouldn't be the case where containersToStart is empty and containersToKeep contains only infraContainerId. In such case
|
||||||
|
// Infra Container should be killed, hence it's removed from this map.
|
||||||
|
// - all running containers which are NOT contained in containersToKeep should be killed.
|
||||||
|
type podContainerChangesSpec struct {
|
||||||
|
startInfraContainer bool
|
||||||
|
infraContainerId dockertools.DockerID
|
||||||
|
containersToStart map[int]empty
|
||||||
|
containersToKeep map[dockertools.DockerID]int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (kl *Kubelet) computePodContainerChanges(pod *api.BoundPod, containersInPod dockertools.DockerContainers) (podContainerChangesSpec, error) {
|
||||||
podFullName := GetPodFullName(pod)
|
podFullName := GetPodFullName(pod)
|
||||||
uid := pod.UID
|
uid := pod.UID
|
||||||
glog.V(4).Infof("Syncing Pod, podFullName: %q, uid: %q", podFullName, uid)
|
glog.V(4).Infof("Syncing Pod %+v, podFullName: %q, uid: %q", pod, podFullName, uid)
|
||||||
|
|
||||||
err := kl.makePodDataDirs(pod)
|
err := kl.makePodDataDirs(pod)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return podContainerChangesSpec{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
containersToStart := make(map[int]empty)
|
||||||
|
containersToKeep := make(map[dockertools.DockerID]int)
|
||||||
|
createPodInfraContainer := false
|
||||||
var podStatus api.PodStatus
|
var podStatus api.PodStatus
|
||||||
podInfraContainerID, found := kl.getPodInfraContainer(podFullName, uid, containersInPod)
|
podInfraContainerID, found := kl.getPodInfraContainer(podFullName, uid, containersInPod)
|
||||||
if !found {
|
if found {
|
||||||
glog.V(2).Infof("Pod infra container doesn't exist for pod %q, killing and re-creating the pod", podFullName)
|
glog.V(4).Infof("Found infra pod for %q", podFullName)
|
||||||
var count int
|
containersToKeep[podInfraContainerID] = -1
|
||||||
count, err = kl.killContainersInPod(pod, containersInPod)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
podInfraContainerID, err = kl.createPodInfraContainer(pod)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Failed to introspect pod infra container: %v; Skipping pod %q", err, podFullName)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if count > 0 {
|
|
||||||
// Re-list everything, otherwise we'll think we're ok.
|
|
||||||
containersInPod, err = dockertools.GetKubeletDockerContainers(kl.dockerClient, false)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Error listing containers %#v", containersInPod)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
podStatus, err = kl.GetPodStatus(podFullName, uid)
|
podStatus, err = kl.GetPodStatus(podFullName, uid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Unable to get pod with name %q and uid %q info with error(%v)", podFullName, uid, err)
|
glog.Errorf("Unable to get pod with name %q and uid %q info with error(%v)", podFullName, uid, err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
podStatus, err = kl.GetPodStatus(podFullName, uid)
|
glog.V(2).Infof("No Infra Container for %q found. All containers will be restarted.", podFullName)
|
||||||
if err != nil {
|
createPodInfraContainer = true
|
||||||
glog.Errorf("Unable to get pod with name %q and uid %q info with error(%v)", podFullName, uid, err)
|
}
|
||||||
|
|
||||||
|
for index, container := range pod.Spec.Containers {
|
||||||
|
expectedHash := dockertools.HashContainer(&container)
|
||||||
|
if dockerContainer, found, hash := containersInPod.FindPodContainer(podFullName, uid, container.Name); found {
|
||||||
|
containerID := dockertools.DockerID(dockerContainer.ID)
|
||||||
|
glog.V(3).Infof("pod %q container %q exists as %v", podFullName, container.Name, containerID)
|
||||||
|
|
||||||
|
if !createPodInfraContainer {
|
||||||
|
// look for changes in the container.
|
||||||
|
containerChanged := hash != 0 && hash != expectedHash
|
||||||
|
if !containerChanged {
|
||||||
|
result, err := kl.probeContainer(pod, podStatus, container, dockerContainer)
|
||||||
|
if err != nil {
|
||||||
|
// TODO(vmarmol): examine this logic.
|
||||||
|
glog.Infof("probe no-error: %s", container.Name)
|
||||||
|
containersToKeep[containerID] = index
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if result == probe.Success {
|
||||||
|
glog.Infof("probe success: %s", container.Name)
|
||||||
|
containersToKeep[containerID] = index
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
glog.Infof("pod %q container %q is unhealthy (probe result: %v). Container will be killed and re-created.", podFullName, container.Name, result)
|
||||||
|
containersToStart[index] = empty{}
|
||||||
|
} else {
|
||||||
|
glog.Infof("pod %q container %q hash changed (%d vs %d). Pod will be killed and re-created.", podFullName, container.Name, hash, expectedHash)
|
||||||
|
createPodInfraContainer = true
|
||||||
|
delete(containersToKeep, podInfraContainerID)
|
||||||
|
// If we are to restart Infra Container then we move containersToKeep into containersToStart
|
||||||
|
// if RestartPolicy allows restarting failed containers.
|
||||||
|
if pod.Spec.RestartPolicy.Never == nil {
|
||||||
|
for _, v := range containersToKeep {
|
||||||
|
containersToStart[v] = empty{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
containersToStart[index] = empty{}
|
||||||
|
containersToKeep = make(map[dockertools.DockerID]int)
|
||||||
|
}
|
||||||
|
} else { // createPodInfraContainer == true and Container exists
|
||||||
|
// If we're creating infra containere everything will be killed anyway
|
||||||
|
// If RestartPolicy is Always or OnFailure we restart containers that were running before we
|
||||||
|
// killed them when restarting Infra Container.
|
||||||
|
if pod.Spec.RestartPolicy.Never == nil {
|
||||||
|
glog.V(1).Infof("Infra Container is being recreated. %q will be restarted.", container.Name)
|
||||||
|
containersToStart[index] = empty{}
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if kl.shouldContainerBeRestarted(&container, pod) {
|
||||||
|
// If we are here it means that the container is dead and sould be restarted, or never existed and should
|
||||||
|
// be created. We may be inserting this ID again if the container has changed and it has
|
||||||
|
// RestartPolicy::Always, but it's not a big deal.
|
||||||
|
glog.V(3).Infof("Container %+v is dead, but RestartPolicy says that we should restart it.", container)
|
||||||
|
containersToStart[index] = empty{}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
containersInPod.RemoveContainerWithID(podInfraContainerID)
|
|
||||||
|
|
||||||
ref, err := api.GetReference(pod)
|
// After the loop one of the following should be true:
|
||||||
if err != nil {
|
// - createPodInfraContainer is true and containersToKeep is empty
|
||||||
glog.Errorf("Couldn't make a ref to pod %q: '%v'", podFullName, err)
|
// - createPodInfraContainer is false and containersToKeep contains at least ID of Infra Container
|
||||||
|
|
||||||
|
// If Infra container is the last running one, we don't want to keep it.
|
||||||
|
if !createPodInfraContainer && len(containersToStart) == 0 && len(containersToKeep) == 1 {
|
||||||
|
containersToKeep = make(map[dockertools.DockerID]int)
|
||||||
}
|
}
|
||||||
|
|
||||||
podVolumes, err := kl.mountExternalVolumes(pod)
|
return podContainerChangesSpec{
|
||||||
|
startInfraContainer: createPodInfraContainer,
|
||||||
|
infraContainerId: podInfraContainerID,
|
||||||
|
containersToStart: containersToStart,
|
||||||
|
containersToKeep: containersToKeep,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (kl *Kubelet) syncPod(pod *api.BoundPod, containersInPod dockertools.DockerContainers) error {
|
||||||
|
podFullName := GetPodFullName(pod)
|
||||||
|
uid := pod.UID
|
||||||
|
containerChanges, err := kl.computePodContainerChanges(pod, containersInPod)
|
||||||
|
glog.V(3).Infof("Got container changes for pod %q: %+v", podFullName, containerChanges)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if containerChanges.startInfraContainer || (len(containerChanges.containersToKeep) == 0 && len(containerChanges.containersToStart) == 0) {
|
||||||
|
if len(containerChanges.containersToKeep) == 0 && len(containerChanges.containersToStart) == 0 {
|
||||||
|
glog.V(4).Infof("Killing Infra Container for %q becase all other containers are dead.", podFullName)
|
||||||
|
} else {
|
||||||
|
glog.V(4).Infof("Killing Infra Container for %q, will start new one", podFullName)
|
||||||
|
}
|
||||||
|
// Killing phase: if we want to start new infra container, or nothing is running kill everything (including infra container)
|
||||||
|
if podInfraContainer, found, _ := containersInPod.FindPodContainer(podFullName, uid, dockertools.PodInfraContainerName); found {
|
||||||
|
if err := kl.killContainer(podInfraContainer); err != nil {
|
||||||
|
glog.Warningf("Failed to kill pod infra container %q: %v", podInfraContainer.ID, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_, err = kl.killContainersInPod(pod, containersInPod)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Otherwise kill any containers in this pod which are not specified as ones to keep.
|
||||||
|
for id, container := range containersInPod {
|
||||||
|
_, keep := containerChanges.containersToKeep[id]
|
||||||
|
if !keep {
|
||||||
|
glog.V(3).Infof("Killing unwanted container %+v", container)
|
||||||
|
err = kl.killContainer(container)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Error killing container: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Starting phase: if we should create infra container then we do it first
|
||||||
|
var ref *api.ObjectReference
|
||||||
|
var podVolumes volumeMap
|
||||||
|
podInfraContainerID := containerChanges.infraContainerId
|
||||||
|
if containerChanges.startInfraContainer && (len(containerChanges.containersToStart) > 0) {
|
||||||
|
ref, err = api.GetReference(pod)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Couldn't make a ref to pod %q: '%v'", podFullName, err)
|
||||||
|
}
|
||||||
|
glog.Infof("Creating pod infra container for %q", podFullName)
|
||||||
|
podInfraContainerID, err = kl.createPodInfraContainer(pod)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Failed to create pod infra container: %v; Skipping pod %q", err, podFullName)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mount volumes
|
||||||
|
podVolumes, err = kl.mountExternalVolumes(pod)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if ref != nil {
|
if ref != nil {
|
||||||
kl.recorder.Eventf(ref, "failedMount",
|
kl.recorder.Eventf(ref, "failedMount",
|
||||||
@ -1242,61 +1370,10 @@ func (kl *Kubelet) syncPod(pod *api.BoundPod, containersInPod dockertools.Docker
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, container := range pod.Spec.Containers {
|
// Start everything
|
||||||
expectedHash := dockertools.HashContainer(&container)
|
for container := range containerChanges.containersToStart {
|
||||||
dockerContainerName := dockertools.BuildDockerName(uid, podFullName, &container)
|
glog.V(4).Infof("Creating container %+v", pod.Spec.Containers[container])
|
||||||
if dockerContainer, found, hash := containersInPod.FindPodContainer(podFullName, uid, container.Name); found {
|
kl.pullImageAndRunContainer(pod, &pod.Spec.Containers[container], &podVolumes, podInfraContainerID)
|
||||||
containerID := dockertools.DockerID(dockerContainer.ID)
|
|
||||||
glog.V(3).Infof("pod %q container %q exists as %v", podFullName, container.Name, containerID)
|
|
||||||
|
|
||||||
// look for changes in the container.
|
|
||||||
containerChanged := hash != 0 && hash != expectedHash
|
|
||||||
if !containerChanged {
|
|
||||||
result, err := kl.probeContainer(pod, podStatus, container, dockerContainer)
|
|
||||||
if err != nil {
|
|
||||||
containersInPod.RemoveContainerWithID(containerID)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if result == probe.Success {
|
|
||||||
containersInPod.RemoveContainerWithID(containerID)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
glog.Infof("pod %q container %q is unhealthy (probe result: %v). Container will be killed and re-created.", podFullName, container.Name, result)
|
|
||||||
} else {
|
|
||||||
glog.Infof("pod %q container %q hash changed (%d vs %d). Container will be killed and re-created.", podFullName, container.Name, hash, expectedHash)
|
|
||||||
// Also kill associated pod infra container if the container changed.
|
|
||||||
if err := kl.killContainerByID(string(podInfraContainerID)); err != nil {
|
|
||||||
glog.V(1).Infof("Failed to kill pod infra container %q: %v", podInfraContainerID, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
containersInPod.RemoveContainerWithID(containerID)
|
|
||||||
}
|
|
||||||
containersInPod.RemoveContainerWithID(containerID)
|
|
||||||
if err := kl.killContainer(dockerContainer); err != nil {
|
|
||||||
glog.V(1).Infof("Failed to kill container %q: %v", dockerContainer.ID, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !kl.shouldContainerBeRestarted(&container, pod) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
glog.V(3).Infof("Container with name %s doesn't exist, creating", dockerContainerName)
|
|
||||||
|
|
||||||
containerID, err := kl.pullImageAndRunContainer(pod, &container, &podVolumes, podInfraContainerID)
|
|
||||||
if err == nil {
|
|
||||||
containersInPod.RemoveContainerWithID(containerID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Kill any remaining containers in this pod which were not identified above (guards against duplicates).
|
|
||||||
for _, container := range containersInPod {
|
|
||||||
glog.V(1).Infof("Killing unwanted container in pod %q: %+v", pod.UID, container)
|
|
||||||
err = kl.killContainer(container)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Error killing container: %v", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -459,7 +459,7 @@ func TestSyncPodsWithTerminationLog(t *testing.T) {
|
|||||||
}
|
}
|
||||||
waitGroup.Wait()
|
waitGroup.Wait()
|
||||||
verifyCalls(t, fakeDocker, []string{
|
verifyCalls(t, fakeDocker, []string{
|
||||||
"list", "list", "create", "start", "inspect_container", "list", "inspect_container", "inspect_image", "list", "create", "start"})
|
"list", "list", "list", "create", "start", "inspect_container", "create", "start"})
|
||||||
|
|
||||||
fakeDocker.Lock()
|
fakeDocker.Lock()
|
||||||
parts := strings.Split(fakeDocker.Container.HostConfig.Binds[0], ":")
|
parts := strings.Split(fakeDocker.Container.HostConfig.Binds[0], ":")
|
||||||
@ -506,7 +506,7 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
|
|||||||
waitGroup.Wait()
|
waitGroup.Wait()
|
||||||
|
|
||||||
verifyCalls(t, fakeDocker, []string{
|
verifyCalls(t, fakeDocker, []string{
|
||||||
"list", "list", "create", "start", "inspect_container", "list", "inspect_container", "inspect_image", "list", "create", "start"})
|
"list", "list", "list", "create", "start", "inspect_container", "create", "start"})
|
||||||
|
|
||||||
fakeDocker.Lock()
|
fakeDocker.Lock()
|
||||||
|
|
||||||
@ -556,7 +556,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
|
|||||||
waitGroup.Wait()
|
waitGroup.Wait()
|
||||||
|
|
||||||
verifyCalls(t, fakeDocker, []string{
|
verifyCalls(t, fakeDocker, []string{
|
||||||
"list", "list", "create", "start", "inspect_container", "list", "inspect_container", "inspect_image", "list", "create", "start"})
|
"list", "list", "list", "create", "start", "inspect_container", "create", "start"})
|
||||||
|
|
||||||
fakeDocker.Lock()
|
fakeDocker.Lock()
|
||||||
|
|
||||||
@ -701,7 +701,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
|
|||||||
waitGroup.Wait()
|
waitGroup.Wait()
|
||||||
|
|
||||||
verifyCalls(t, fakeDocker, []string{
|
verifyCalls(t, fakeDocker, []string{
|
||||||
"list", "list", "stop", "create", "start", "inspect_container", "list", "list", "inspect_container", "inspect_image", "list", "create", "start"})
|
"list", "list", "stop", "create", "start", "inspect_container", "create", "start"})
|
||||||
|
|
||||||
// A map iteration is used to delete containers, so must not depend on
|
// A map iteration is used to delete containers, so must not depend on
|
||||||
// order here.
|
// order here.
|
||||||
@ -873,7 +873,7 @@ func TestSyncPodBadHash(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
//verifyCalls(t, fakeDocker, []string{"list", "stop", "list", "create", "start", "stop", "create", "start", "inspect_container"})
|
//verifyCalls(t, fakeDocker, []string{"list", "stop", "list", "create", "start", "stop", "create", "start", "inspect_container"})
|
||||||
verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "list", "create", "start"})
|
verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "create", "start", "inspect_container", "create", "start"})
|
||||||
|
|
||||||
// A map interation is used to delete containers, so must not depend on
|
// A map interation is used to delete containers, so must not depend on
|
||||||
// order here.
|
// order here.
|
||||||
@ -924,7 +924,7 @@ func TestSyncPodUnhealthy(t *testing.T) {
|
|||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
verifyCalls(t, fakeDocker, []string{"list", "stop", "list", "create", "start"})
|
verifyCalls(t, fakeDocker, []string{"list", "stop", "create", "start"})
|
||||||
|
|
||||||
// A map interation is used to delete containers, so must not depend on
|
// A map interation is used to delete containers, so must not depend on
|
||||||
// order here.
|
// order here.
|
||||||
@ -1987,7 +1987,16 @@ func TestSyncPodsWithPullPolicy(t *testing.T) {
|
|||||||
|
|
||||||
fakeDocker.Lock()
|
fakeDocker.Lock()
|
||||||
|
|
||||||
if !reflect.DeepEqual(puller.ImagesPulled, []string{"custom_image_name", "pull_always_image", "pull_if_not_present_image"}) {
|
pulledImageSet := make(map[string]empty)
|
||||||
|
for v := range puller.ImagesPulled {
|
||||||
|
pulledImageSet[puller.ImagesPulled[v]] = empty{}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !reflect.DeepEqual(pulledImageSet, map[string]empty{
|
||||||
|
"custom_image_name": {},
|
||||||
|
"pull_always_image": {},
|
||||||
|
"pull_if_not_present_image": {},
|
||||||
|
}) {
|
||||||
t.Errorf("Unexpected pulled containers: %v", puller.ImagesPulled)
|
t.Errorf("Unexpected pulled containers: %v", puller.ImagesPulled)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user