Re-run init containers if the pod sandbox needs to be recreated

Whenever pod sandbox needs to be recreated, all containers associated
with it will be killed by kubelet. This change ensures that the init
containers will be rerun in such cases.

The change also refactors the compute logic so that the control flow of
init containers act is more aligned with the regular containers. Unit
tests are added to verify the logic.
This commit is contained in:
Yu-Ju Hong 2017-06-14 17:42:01 -07:00
parent 5c558ddb18
commit 152d8b9d96
4 changed files with 598 additions and 208 deletions

View File

@ -55,6 +55,18 @@ type FakeRuntimeService struct {
Sandboxes map[string]*FakePodSandbox Sandboxes map[string]*FakePodSandbox
} }
func (r *FakeRuntimeService) GetContainerID(sandboxID, name string, attempt uint32) (string, error) {
r.Lock()
defer r.Unlock()
for id, c := range r.Containers {
if c.SandboxID == sandboxID && c.Metadata.Name == name && c.Metadata.Attempt == attempt {
return id, nil
}
}
return "", fmt.Errorf("container (name, attempt, sandboxID)=(%q, %d, %q) not found", name, attempt, sandboxID)
}
func (r *FakeRuntimeService) SetFakeSandboxes(sandboxes []*FakePodSandbox) { func (r *FakeRuntimeService) SetFakeSandboxes(sandboxes []*FakePodSandbox) {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()

View File

@ -631,10 +631,11 @@ func (m *kubeGenericRuntimeManager) killContainersWithSyncResult(pod *v1.Pod, ru
return return
} }
// pruneInitContainers ensures that before we begin creating init containers, we have reduced the number // pruneInitContainersBeforeStart ensures that before we begin creating init
// of outstanding init containers still present. This reduces load on the container garbage collector // containers, we have reduced the number of outstanding init containers still
// by only preserving the most recent terminated init container. // present. This reduces load on the container garbage collector by only
func (m *kubeGenericRuntimeManager) pruneInitContainersBeforeStart(pod *v1.Pod, podStatus *kubecontainer.PodStatus, initContainersToKeep map[kubecontainer.ContainerID]int) { // preserving the most recent terminated init container.
func (m *kubeGenericRuntimeManager) pruneInitContainersBeforeStart(pod *v1.Pod, podStatus *kubecontainer.PodStatus) {
// only the last execution of each init container should be preserved, and only preserve it if it is in the // only the last execution of each init container should be preserved, and only preserve it if it is in the
// list of init containers to keep. // list of init containers to keep.
initContainerNames := sets.NewString() initContainerNames := sets.NewString()
@ -652,14 +653,9 @@ func (m *kubeGenericRuntimeManager) pruneInitContainersBeforeStart(pod *v1.Pod,
if count == 1 { if count == 1 {
continue continue
} }
// if there is a reason to preserve the older container, do so
if _, ok := initContainersToKeep[status.ID]; ok {
continue
}
// prune all other init containers that match this container name // prune all other init containers that match this container name
glog.V(4).Infof("Removing init container %q instance %q %d", status.Name, status.ID.ID, count) glog.V(4).Infof("Removing init container %q instance %q %d", status.Name, status.ID.ID, count)
if err := m.runtimeService.RemoveContainer(status.ID.ID); err != nil { if err := m.removeContainer(status.ID.ID); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to remove pod init container %q: %v; Skipping pod %q", status.Name, err, format.Pod(pod))) utilruntime.HandleError(fmt.Errorf("failed to remove pod init container %q: %v; Skipping pod %q", status.Name, err, format.Pod(pod)))
continue continue
} }
@ -674,6 +670,37 @@ func (m *kubeGenericRuntimeManager) pruneInitContainersBeforeStart(pod *v1.Pod,
} }
} }
// Remove all init containres. Note that this function does not check the state
// of the container because it assumes all init containers have been stopped
// before the call happens.
func (m *kubeGenericRuntimeManager) purgeInitContainers(pod *v1.Pod, podStatus *kubecontainer.PodStatus) {
initContainerNames := sets.NewString()
for _, container := range pod.Spec.InitContainers {
initContainerNames.Insert(container.Name)
}
for name := range initContainerNames {
count := 0
for _, status := range podStatus.ContainerStatuses {
if status.Name != name || !initContainerNames.Has(status.Name) {
continue
}
count++
// Purge all init containers that match this container name
glog.V(4).Infof("Removing init container %q instance %q %d", status.Name, status.ID.ID, count)
if err := m.removeContainer(status.ID.ID); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to remove pod init container %q: %v; Skipping pod %q", status.Name, err, format.Pod(pod)))
continue
}
// Remove any references to this container
if _, ok := m.containerRefManager.GetRef(status.ID); ok {
m.containerRefManager.ClearRef(status.ID)
} else {
glog.Warningf("No ref for container %q", status.ID)
}
}
}
}
// findNextInitContainerToRun returns the status of the last failed container, the // findNextInitContainerToRun returns the status of the last failed container, the
// next init container to start, or done if there are no further init containers. // next init container to start, or done if there are no further init containers.
// Status is only returned if an init container is failed, in which case next will // Status is only returned if an init container is failed, in which case next will

View File

@ -333,34 +333,28 @@ type containerToKillInfo struct {
message string message string
} }
// podContainerSpecChanges keeps information on changes that need to happen for a pod. // podActions keeps information what to do for a pod.
type podContainerSpecChanges struct { type podActions struct {
// Whether need to create a new sandbox. // Stop all running (regular and init) containers and the sandbox for the pod.
KillPod bool
// Whether need to create a new sandbox. If needed to kill pod and create a
// a new pod sandbox, all init containers need to be purged (i.e., removed).
CreateSandbox bool CreateSandbox bool
// The id of existing sandbox. It is used for starting containers in ContainersToStart. // The id of existing sandbox. It is used for starting containers in ContainersToStart.
SandboxID string SandboxID string
// The attempt number of creating sandboxes for the pod. // The attempt number of creating sandboxes for the pod.
Attempt uint32 Attempt uint32
// ContainersToStart keeps a map of containers that need to be started, note that // The next init container to start.
// the key is index of the container inside pod.Spec.Containers, while NextInitContainerToStart *v1.Container
// the value is a message indicates why the container needs to start. // ContainersToStart keeps a list of indexes for the containers to start,
ContainersToStart map[int]string // where the index is the index of the specific container in the pod spec (
// ContainersToKeep keeps a map of containers that need to be kept as is, note that // pod.Spec.Containers.
// the key is the container ID of the container, while ContainersToStart []int
// the value is index of the container inside pod.Spec.Containers.
ContainersToKeep map[kubecontainer.ContainerID]int
// ContainersToKill keeps a map of containers that need to be killed, note that // ContainersToKill keeps a map of containers that need to be killed, note that
// the key is the container ID of the container, while // the key is the container ID of the container, while
// the value contains necessary information to kill a container. // the value contains necessary information to kill a container.
ContainersToKill map[kubecontainer.ContainerID]containerToKillInfo ContainersToKill map[kubecontainer.ContainerID]containerToKillInfo
// InitFailed indicates whether init containers are failed.
InitFailed bool
// InitContainersToKeep keeps a map of init containers that need to be kept as
// is, note that the key is the container ID of the container, while
// the value is index of the container inside pod.Spec.InitContainers.
InitContainersToKeep map[kubecontainer.ContainerID]int
} }
// podSandboxChanged checks whether the spec of the pod is changed and returns // podSandboxChanged checks whether the spec of the pod is changed and returns
@ -399,141 +393,127 @@ func (m *kubeGenericRuntimeManager) podSandboxChanged(pod *v1.Pod, podStatus *ku
return false, sandboxStatus.Metadata.Attempt, sandboxStatus.Id return false, sandboxStatus.Metadata.Attempt, sandboxStatus.Id
} }
// checkAndKeepInitContainers keeps all successfully completed init containers. If there func containerChanged(container *v1.Container, containerStatus *kubecontainer.ContainerStatus) (uint64, uint64, bool) {
// are failing containers, only keep the first failing one. expectedHash := kubecontainer.HashContainer(container)
func checkAndKeepInitContainers(pod *v1.Pod, podStatus *kubecontainer.PodStatus, initContainersToKeep map[kubecontainer.ContainerID]int) bool { return expectedHash, containerStatus.Hash, containerStatus.Hash != expectedHash
initFailed := false
for i, container := range pod.Spec.InitContainers {
containerStatus := podStatus.FindContainerStatusByName(container.Name)
if containerStatus == nil {
continue
} }
if containerStatus.State == kubecontainer.ContainerStateRunning { func shouldRestartOnFailure(pod *v1.Pod) bool {
initContainersToKeep[containerStatus.ID] = i return pod.Spec.RestartPolicy != v1.RestartPolicyNever
continue
} }
if containerStatus.State == kubecontainer.ContainerStateExited { func containerSucceeded(c *v1.Container, podStatus *kubecontainer.PodStatus) bool {
initContainersToKeep[containerStatus.ID] = i cStatus := podStatus.FindContainerStatusByName(c.Name)
if cStatus == nil || cStatus.State == kubecontainer.ContainerStateRunning {
return false
}
return cStatus.ExitCode == 0
} }
if isContainerFailed(containerStatus) { // computePodActions checks whether the pod spec has changed and returns the changes if true.
initFailed = true func (m *kubeGenericRuntimeManager) computePodActions(pod *v1.Pod, podStatus *kubecontainer.PodStatus) podActions {
break
}
}
return initFailed
}
// computePodContainerChanges checks whether the pod spec has changed and returns the changes if true.
func (m *kubeGenericRuntimeManager) computePodContainerChanges(pod *v1.Pod, podStatus *kubecontainer.PodStatus) podContainerSpecChanges {
glog.V(5).Infof("Syncing Pod %q: %+v", format.Pod(pod), pod) glog.V(5).Infof("Syncing Pod %q: %+v", format.Pod(pod), pod)
sandboxChanged, attempt, sandboxID := m.podSandboxChanged(pod, podStatus) createPodSandbox, attempt, sandboxID := m.podSandboxChanged(pod, podStatus)
changes := podContainerSpecChanges{ changes := podActions{
CreateSandbox: sandboxChanged, KillPod: createPodSandbox,
CreateSandbox: createPodSandbox,
SandboxID: sandboxID, SandboxID: sandboxID,
Attempt: attempt, Attempt: attempt,
ContainersToStart: make(map[int]string), ContainersToStart: []int{},
ContainersToKeep: make(map[kubecontainer.ContainerID]int),
InitContainersToKeep: make(map[kubecontainer.ContainerID]int),
ContainersToKill: make(map[kubecontainer.ContainerID]containerToKillInfo), ContainersToKill: make(map[kubecontainer.ContainerID]containerToKillInfo),
} }
// check the status of init containers. // If we need to (re-)create the pod sandbox, everything will need to be
initFailed := false // killed and recreated, and init containers should be purged.
// always reset the init containers if the sandbox is changed. if createPodSandbox {
if !sandboxChanged { if !shouldRestartOnFailure(pod) && attempt != 0 {
// Keep all successfully completed containers. If there are failing containers, // Should not restart the pod, just return.
// only keep the first failing one. return changes
initFailed = checkAndKeepInitContainers(pod, podStatus, changes.InitContainersToKeep) }
if len(pod.Spec.InitContainers) != 0 {
// Pod has init containers, return the first one.
changes.NextInitContainerToStart = &pod.Spec.InitContainers[0]
return changes
}
// Start all containers by default but exclude the ones that succeeded if
// RestartPolicy is OnFailure.
for idx, c := range pod.Spec.Containers {
if containerSucceeded(&c, podStatus) && pod.Spec.RestartPolicy == v1.RestartPolicyOnFailure {
continue
}
changes.ContainersToStart = append(changes.ContainersToStart, idx)
}
return changes
} }
changes.InitFailed = initFailed
// Check initialization progress.
initLastStatus, next, done := findNextInitContainerToRun(pod, podStatus)
if !done {
if next != nil {
initFailed := initLastStatus != nil && isContainerFailed(initLastStatus)
if initFailed && !shouldRestartOnFailure(pod) {
changes.KillPod = true
} else {
changes.NextInitContainerToStart = next
}
}
// Initialization failed or still in progress. Skip inspecting non-init
// containers.
return changes
}
// Number of running containers to keep.
keepCount := 0
// check the status of containers. // check the status of containers.
for index, container := range pod.Spec.Containers { for idx, container := range pod.Spec.Containers {
containerStatus := podStatus.FindContainerStatusByName(container.Name) containerStatus := podStatus.FindContainerStatusByName(container.Name)
// If container does not exist, or is not running, check whether we
// need to restart it.
if containerStatus == nil || containerStatus.State != kubecontainer.ContainerStateRunning { if containerStatus == nil || containerStatus.State != kubecontainer.ContainerStateRunning {
if kubecontainer.ShouldContainerBeRestarted(&container, pod, podStatus) { if kubecontainer.ShouldContainerBeRestarted(&container, pod, podStatus) {
message := fmt.Sprintf("Container %+v is dead, but RestartPolicy says that we should restart it.", container) message := fmt.Sprintf("Container %+v is dead, but RestartPolicy says that we should restart it.", container)
glog.Info(message) glog.Info(message)
changes.ContainersToStart[index] = message changes.ContainersToStart = append(changes.ContainersToStart, idx)
} }
continue continue
} }
if sandboxChanged { // The container is running, but kill the container if any of the following condition is met.
if pod.Spec.RestartPolicy != v1.RestartPolicyNever { reason := ""
message := fmt.Sprintf("Container %+v's pod sandbox is dead, the container will be recreated.", container) restart := shouldRestartOnFailure(pod)
glog.Info(message) if expectedHash, actualHash, changed := containerChanged(&container, containerStatus); changed {
changes.ContainersToStart[index] = message reason = fmt.Sprintf("Container spec hash changed (%d vs %d).", actualHash, expectedHash)
} // Restart regardless of the restart policy because the container
// spec changed.
restart = true
} else if liveness, found := m.livenessManager.Get(containerStatus.ID); found && liveness == proberesults.Failure {
// If the container failed the liveness probe, we should kill it.
reason = "Container failed liveness probe."
} else {
// Keep the container.
keepCount += 1
continue continue
} }
if initFailed { // We need to kill the container, but if we also want to restart the
// Initialization failed and Container exists. // container afterwards, make the intent clear in the message. Also do
// If we have an initialization failure everything will be killed anyway. // not kill the entire pod since we expect container to be running eventually.
// If RestartPolicy is Always or OnFailure we restart containers that were running before. message := reason
if pod.Spec.RestartPolicy != v1.RestartPolicyNever { if restart {
message := fmt.Sprintf("Failed to initialize pod. %q will be restarted.", container.Name) message = fmt.Sprintf("%s. Container will be killed and recreated.", message)
glog.V(1).Info(message) changes.ContainersToStart = append(changes.ContainersToStart, idx)
changes.ContainersToStart[index] = message
}
continue
}
expectedHash := kubecontainer.HashContainer(&container)
containerChanged := containerStatus.Hash != expectedHash
if containerChanged {
message := fmt.Sprintf("Pod %q container %q hash changed (%d vs %d), it will be killed and re-created.",
pod.Name, container.Name, containerStatus.Hash, expectedHash)
glog.Info(message)
changes.ContainersToStart[index] = message
continue
}
liveness, found := m.livenessManager.Get(containerStatus.ID)
if !found || liveness == proberesults.Success {
changes.ContainersToKeep[containerStatus.ID] = index
continue
}
if pod.Spec.RestartPolicy != v1.RestartPolicyNever {
message := fmt.Sprintf("pod %q container %q is unhealthy, it will be killed and re-created.", format.Pod(pod), container.Name)
glog.Info(message)
changes.ContainersToStart[index] = message
}
}
// Don't keep init containers if they are the only containers to keep.
if !sandboxChanged && len(changes.ContainersToStart) == 0 && len(changes.ContainersToKeep) == 0 {
changes.InitContainersToKeep = make(map[kubecontainer.ContainerID]int)
}
// compute containers to be killed
runningContainerStatuses := podStatus.GetRunningContainerStatuses()
for _, containerStatus := range runningContainerStatuses {
_, keep := changes.ContainersToKeep[containerStatus.ID]
_, keepInit := changes.InitContainersToKeep[containerStatus.ID]
if !keep && !keepInit {
var podContainer *v1.Container
var killMessage string
for i, c := range pod.Spec.Containers {
if c.Name == containerStatus.Name {
podContainer = &pod.Spec.Containers[i]
killMessage = changes.ContainersToStart[i]
break
}
} }
changes.ContainersToKill[containerStatus.ID] = containerToKillInfo{ changes.ContainersToKill[containerStatus.ID] = containerToKillInfo{
name: containerStatus.Name, name: containerStatus.Name,
container: podContainer, container: &pod.Spec.Containers[idx],
message: killMessage, message: message,
} }
glog.V(2).Infof("Container %q (%q) of pod %s: %s", container.Name, containerStatus.ID, format.Pod(pod), message)
} }
if keepCount == 0 && len(changes.ContainersToStart) == 0 {
changes.KillPod = true
} }
return changes return changes
@ -549,8 +529,8 @@ func (m *kubeGenericRuntimeManager) computePodContainerChanges(pod *v1.Pod, podS
// 6. Create normal containers. // 6. Create normal containers.
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) { func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
// Step 1: Compute sandbox and container changes. // Step 1: Compute sandbox and container changes.
podContainerChanges := m.computePodContainerChanges(pod, podStatus) podContainerChanges := m.computePodActions(pod, podStatus)
glog.V(3).Infof("computePodContainerChanges got %+v for pod %q", podContainerChanges, format.Pod(pod)) glog.V(3).Infof("computePodActions got %+v for pod %q", podContainerChanges, format.Pod(pod))
if podContainerChanges.CreateSandbox { if podContainerChanges.CreateSandbox {
ref, err := ref.GetReference(api.Scheme, pod) ref, err := ref.GetReference(api.Scheme, pod)
if err != nil { if err != nil {
@ -559,13 +539,13 @@ func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStat
if podContainerChanges.SandboxID != "" { if podContainerChanges.SandboxID != "" {
m.recorder.Eventf(ref, v1.EventTypeNormal, events.SandboxChanged, "Pod sandbox changed, it will be killed and re-created.") m.recorder.Eventf(ref, v1.EventTypeNormal, events.SandboxChanged, "Pod sandbox changed, it will be killed and re-created.")
} else { } else {
glog.V(4).Infof("SyncPod received new pod %q, will create a new sandbox for it", format.Pod(pod)) glog.V(4).Infof("SyncPod received new pod %q, will create a sandbox for it", format.Pod(pod))
} }
} }
// Step 2: Kill the pod if the sandbox has changed. // Step 2: Kill the pod if the sandbox has changed.
if podContainerChanges.CreateSandbox || (len(podContainerChanges.ContainersToKeep) == 0 && len(podContainerChanges.ContainersToStart) == 0) { if podContainerChanges.KillPod {
if len(podContainerChanges.ContainersToKeep) == 0 && len(podContainerChanges.ContainersToStart) == 0 { if !podContainerChanges.CreateSandbox {
glog.V(4).Infof("Stopping PodSandbox for %q because all other containers are dead.", format.Pod(pod)) glog.V(4).Infof("Stopping PodSandbox for %q because all other containers are dead.", format.Pod(pod))
} else { } else {
glog.V(4).Infof("Stopping PodSandbox for %q, will start new one", format.Pod(pod)) glog.V(4).Infof("Stopping PodSandbox for %q, will start new one", format.Pod(pod))
@ -577,6 +557,10 @@ func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStat
glog.Errorf("killPodWithSyncResult failed: %v", killResult.Error()) glog.Errorf("killPodWithSyncResult failed: %v", killResult.Error())
return return
} }
if podContainerChanges.CreateSandbox {
m.purgeInitContainers(pod, podStatus)
}
} else { } else {
// Step 3: kill any running containers in this pod which are not to keep. // Step 3: kill any running containers in this pod which are not to keep.
for containerID, containerInfo := range podContainerChanges.ContainersToKill { for containerID, containerInfo := range podContainerChanges.ContainersToKill {
@ -592,7 +576,9 @@ func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStat
} }
// Keep terminated init containers fairly aggressively controlled // Keep terminated init containers fairly aggressively controlled
m.pruneInitContainersBeforeStart(pod, podStatus, podContainerChanges.InitContainersToKeep) // This is an optmization because container removals are typically handled
// by container garbage collector.
m.pruneInitContainersBeforeStart(pod, podStatus)
// We pass the value of the podIP down to generatePodSandboxConfig and // We pass the value of the podIP down to generatePodSandboxConfig and
// generateContainerConfig, which in turn passes it to various other // generateContainerConfig, which in turn passes it to various other
@ -610,7 +596,7 @@ func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStat
// Step 4: Create a sandbox for the pod if necessary. // Step 4: Create a sandbox for the pod if necessary.
podSandboxID := podContainerChanges.SandboxID podSandboxID := podContainerChanges.SandboxID
if podContainerChanges.CreateSandbox && len(podContainerChanges.ContainersToStart) > 0 { if podContainerChanges.CreateSandbox {
var msg string var msg string
var err error var err error
@ -652,30 +638,11 @@ func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStat
return return
} }
// Step 5: start init containers. // Step 5: start the init container.
status, next, done := findNextInitContainerToRun(pod, podStatus) if container := podContainerChanges.NextInitContainerToStart; container != nil {
if status != nil && status.ExitCode != 0 { // Start the next init container.
// container initialization has failed, flag the pod as failed
initContainerResult := kubecontainer.NewSyncResult(kubecontainer.InitContainer, status.Name)
initContainerResult.Fail(kubecontainer.ErrRunInitContainer, fmt.Sprintf("init container %q exited with %d", status.Name, status.ExitCode))
result.AddSyncResult(initContainerResult)
if pod.Spec.RestartPolicy == v1.RestartPolicyNever {
utilruntime.HandleError(fmt.Errorf("error running pod %q init container %q, restart=Never: %#v", format.Pod(pod), status.Name, status))
return
}
utilruntime.HandleError(fmt.Errorf("Error running pod %q init container %q, restarting: %#v", format.Pod(pod), status.Name, status))
}
if next != nil {
if len(podContainerChanges.ContainersToStart) == 0 {
glog.V(4).Infof("No containers to start, stopping at init container %+v in pod %v", next.Name, format.Pod(pod))
return
}
// If we need to start the next container, do so now then exit
container := next
startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name) startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name)
result.AddSyncResult(startContainerResult) result.AddSyncResult(startContainerResult)
isInBackOff, msg, err := m.doBackOff(pod, container, podStatus, backOff) isInBackOff, msg, err := m.doBackOff(pod, container, podStatus, backOff)
if isInBackOff { if isInBackOff {
startContainerResult.Fail(err, msg) startContainerResult.Fail(err, msg)
@ -692,20 +659,10 @@ func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStat
// Successfully started the container; clear the entry in the failure // Successfully started the container; clear the entry in the failure
glog.V(4).Infof("Completed init container %q for pod %q", container.Name, format.Pod(pod)) glog.V(4).Infof("Completed init container %q for pod %q", container.Name, format.Pod(pod))
return
}
if !done {
// init container still running
glog.V(4).Infof("An init container is still running in pod %v", format.Pod(pod))
return
}
if podContainerChanges.InitFailed {
glog.V(4).Infof("Not all init containers have succeeded for pod %v", format.Pod(pod))
return
} }
// Step 6: start containers in podContainerChanges.ContainersToStart. // Step 6: start containers in podContainerChanges.ContainersToStart.
for idx := range podContainerChanges.ContainersToStart { for _, idx := range podContainerChanges.ContainersToStart {
container := &pod.Spec.Containers[idx] container := &pod.Spec.Containers[idx]
startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name) startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name)
result.AddSyncResult(startContainerResult) result.AddSyncResult(startContainerResult)

View File

@ -24,6 +24,7 @@ import (
cadvisorapi "github.com/google/cadvisor/info/v1" cadvisorapi "github.com/google/cadvisor/info/v1"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -226,6 +227,40 @@ func verifyFakeContainerList(fakeRuntime *apitest.FakeRuntimeService, expected [
return actual, reflect.DeepEqual(expected, actual) return actual, reflect.DeepEqual(expected, actual)
} }
type containerRecord struct {
container *v1.Container
attempt uint32
state runtimeapi.ContainerState
}
// Only extract the fields of interests.
type cRecord struct {
name string
attempt uint32
state runtimeapi.ContainerState
}
type cRecordList []*cRecord
func (b cRecordList) Len() int { return len(b) }
func (b cRecordList) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
func (b cRecordList) Less(i, j int) bool {
if b[i].name != b[j].name {
return b[i].name < b[j].name
}
return b[i].attempt < b[j].attempt
}
func verifyContainerStatuses(t *testing.T, runtime *apitest.FakeRuntimeService, expected []*cRecord, desc string) {
actual := []*cRecord{}
for _, cStatus := range runtime.Containers {
actual = append(actual, &cRecord{name: cStatus.Metadata.Name, attempt: cStatus.Metadata.Attempt, state: cStatus.State})
}
sort.Sort(cRecordList(expected))
sort.Sort(cRecordList(actual))
assert.Equal(t, expected, actual, desc)
}
func TestNewKubeRuntimeManager(t *testing.T) { func TestNewKubeRuntimeManager(t *testing.T) {
_, _, _, err := createTestRuntimeManager() _, _, _, err := createTestRuntimeManager()
assert.NoError(t, err) assert.NoError(t, err)
@ -582,8 +617,7 @@ func TestPruneInitContainers(t *testing.T) {
podStatus, err := m.GetPodStatus(pod.UID, pod.Name, pod.Namespace) podStatus, err := m.GetPodStatus(pod.UID, pod.Name, pod.Namespace)
assert.NoError(t, err) assert.NoError(t, err)
keep := map[kubecontainer.ContainerID]int{} m.pruneInitContainersBeforeStart(pod, podStatus)
m.pruneInitContainersBeforeStart(pod, podStatus, keep)
expectedContainers := []string{fakes[0].Id, fakes[2].Id} expectedContainers := []string{fakes[0].Id, fakes[2].Id}
if actual, ok := verifyFakeContainerList(fakeRuntime, expectedContainers); !ok { if actual, ok := verifyFakeContainerList(fakeRuntime, expectedContainers); !ok {
t.Errorf("expected %q, got %q", expectedContainers, actual) t.Errorf("expected %q, got %q", expectedContainers, actual)
@ -625,18 +659,6 @@ func TestSyncPodWithInitContainers(t *testing.T) {
}, },
} }
// buildContainerID is an internal helper function to build container id from api pod
// and container with default attempt number 0.
buildContainerID := func(pod *v1.Pod, container v1.Container) string {
uid := string(pod.UID)
sandboxID := apitest.BuildSandboxName(&runtimeapi.PodSandboxMetadata{
Name: pod.Name,
Uid: uid,
Namespace: pod.Namespace,
})
return apitest.BuildContainerName(&runtimeapi.ContainerMetadata{Name: container.Name}, sandboxID)
}
backOff := flowcontrol.NewBackOff(time.Second, time.Minute) backOff := flowcontrol.NewBackOff(time.Second, time.Minute)
// 1. should only create the init container. // 1. should only create the init container.
@ -644,34 +666,406 @@ func TestSyncPodWithInitContainers(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
result := m.SyncPod(pod, v1.PodStatus{}, podStatus, []v1.Secret{}, backOff) result := m.SyncPod(pod, v1.PodStatus{}, podStatus, []v1.Secret{}, backOff)
assert.NoError(t, result.Error()) assert.NoError(t, result.Error())
assert.Equal(t, 1, len(fakeRuntime.Containers)) expected := []*cRecord{
initContainerID := buildContainerID(pod, initContainers[0]) {name: initContainers[0].Name, attempt: 0, state: runtimeapi.ContainerState_CONTAINER_RUNNING},
expectedContainers := []string{initContainerID}
if actual, ok := verifyFakeContainerList(fakeRuntime, expectedContainers); !ok {
t.Errorf("expected %q, got %q", expectedContainers, actual)
} }
verifyContainerStatuses(t, fakeRuntime, expected, "start only the init container")
// 2. should not create app container because init container is still running. // 2. should not create app container because init container is still running.
podStatus, err = m.GetPodStatus(pod.UID, pod.Name, pod.Namespace) podStatus, err = m.GetPodStatus(pod.UID, pod.Name, pod.Namespace)
assert.NoError(t, err) assert.NoError(t, err)
result = m.SyncPod(pod, v1.PodStatus{}, podStatus, []v1.Secret{}, backOff) result = m.SyncPod(pod, v1.PodStatus{}, podStatus, []v1.Secret{}, backOff)
assert.NoError(t, result.Error()) assert.NoError(t, result.Error())
assert.Equal(t, 1, len(fakeRuntime.Containers)) verifyContainerStatuses(t, fakeRuntime, expected, "init container still running; do nothing")
expectedContainers = []string{initContainerID}
if actual, ok := verifyFakeContainerList(fakeRuntime, expectedContainers); !ok {
t.Errorf("expected %q, got %q", expectedContainers, actual)
}
// 3. should create all app containers because init container finished. // 3. should create all app containers because init container finished.
fakeRuntime.StopContainer(initContainerID, 0) // Stop init container instance 0.
sandboxIDs, err := m.getSandboxIDByPodUID(pod.UID, nil)
require.NoError(t, err)
sandboxID := sandboxIDs[0]
initID0, err := fakeRuntime.GetContainerID(sandboxID, initContainers[0].Name, 0)
require.NoError(t, err)
fakeRuntime.StopContainer(initID0, 0)
// Sync again.
podStatus, err = m.GetPodStatus(pod.UID, pod.Name, pod.Namespace) podStatus, err = m.GetPodStatus(pod.UID, pod.Name, pod.Namespace)
assert.NoError(t, err) assert.NoError(t, err)
result = m.SyncPod(pod, v1.PodStatus{}, podStatus, []v1.Secret{}, backOff) result = m.SyncPod(pod, v1.PodStatus{}, podStatus, []v1.Secret{}, backOff)
assert.NoError(t, result.Error()) assert.NoError(t, result.Error())
assert.Equal(t, 3, len(fakeRuntime.Containers)) expected = []*cRecord{
expectedContainers = []string{initContainerID, buildContainerID(pod, containers[0]), {name: initContainers[0].Name, attempt: 0, state: runtimeapi.ContainerState_CONTAINER_EXITED},
buildContainerID(pod, containers[1])} {name: containers[0].Name, attempt: 0, state: runtimeapi.ContainerState_CONTAINER_RUNNING},
if actual, ok := verifyFakeContainerList(fakeRuntime, expectedContainers); !ok { {name: containers[1].Name, attempt: 0, state: runtimeapi.ContainerState_CONTAINER_RUNNING},
t.Errorf("expected %q, got %q", expectedContainers, actual) }
verifyContainerStatuses(t, fakeRuntime, expected, "init container completed; all app containers should be running")
// 4. should restart the init container if needed to create a new podsandbox
// Stop the pod sandbox.
fakeRuntime.StopPodSandbox(sandboxID)
// Sync again.
podStatus, err = m.GetPodStatus(pod.UID, pod.Name, pod.Namespace)
assert.NoError(t, err)
result = m.SyncPod(pod, v1.PodStatus{}, podStatus, []v1.Secret{}, backOff)
assert.NoError(t, result.Error())
expected = []*cRecord{
// The first init container instance is purged and no longer visible.
// The second (attempt == 1) instance has been started and is running.
{name: initContainers[0].Name, attempt: 1, state: runtimeapi.ContainerState_CONTAINER_RUNNING},
// All containers are killed.
{name: containers[0].Name, attempt: 0, state: runtimeapi.ContainerState_CONTAINER_EXITED},
{name: containers[1].Name, attempt: 0, state: runtimeapi.ContainerState_CONTAINER_EXITED},
}
verifyContainerStatuses(t, fakeRuntime, expected, "kill all app containers, purge the existing init container, and restart a new one")
}
// A helper function to get a basic pod and its status assuming all sandbox and
// containers are running and ready.
func makeBasePodAndStatus() (*v1.Pod, *kubecontainer.PodStatus) {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: "12345678",
Name: "foo",
Namespace: "foo-ns",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "foo1",
Image: "busybox",
},
{
Name: "foo2",
Image: "busybox",
},
{
Name: "foo3",
Image: "busybox",
},
},
},
}
status := &kubecontainer.PodStatus{
ID: pod.UID,
Name: pod.Name,
Namespace: pod.Namespace,
SandboxStatuses: []*runtimeapi.PodSandboxStatus{
{
Id: "sandboxID",
State: runtimeapi.PodSandboxState_SANDBOX_READY,
Metadata: &runtimeapi.PodSandboxMetadata{Name: pod.Name, Namespace: pod.Namespace, Uid: "sandboxuid", Attempt: uint32(0)},
},
},
ContainerStatuses: []*kubecontainer.ContainerStatus{
{
ID: kubecontainer.ContainerID{ID: "id1"},
Name: "foo1", State: kubecontainer.ContainerStateRunning,
Hash: kubecontainer.HashContainer(&pod.Spec.Containers[0]),
},
{
ID: kubecontainer.ContainerID{ID: "id2"},
Name: "foo2", State: kubecontainer.ContainerStateRunning,
Hash: kubecontainer.HashContainer(&pod.Spec.Containers[1]),
},
{
ID: kubecontainer.ContainerID{ID: "id3"},
Name: "foo3", State: kubecontainer.ContainerStateRunning,
Hash: kubecontainer.HashContainer(&pod.Spec.Containers[2]),
},
},
}
return pod, status
}
func TestComputePodActions(t *testing.T) {
_, _, m, err := createTestRuntimeManager()
require.NoError(t, err)
// Createing a pair reference pod and status for the test cases to refer
// the specific fields.
basePod, baseStatus := makeBasePodAndStatus()
noAction := podActions{
SandboxID: baseStatus.SandboxStatuses[0].Id,
ContainersToStart: []int{},
ContainersToKill: map[kubecontainer.ContainerID]containerToKillInfo{},
}
for desc, test := range map[string]struct {
mutatePodFn func(*v1.Pod)
mutateStatusFn func(*kubecontainer.PodStatus)
actions podActions
}{
"everying is good; do nothing": {
actions: noAction,
},
"start pod sandbox and all containers for a new pod": {
mutateStatusFn: func(status *kubecontainer.PodStatus) {
// No container or sandbox exists.
status.SandboxStatuses = []*runtimeapi.PodSandboxStatus{}
status.ContainerStatuses = []*kubecontainer.ContainerStatus{}
},
actions: podActions{
KillPod: true,
CreateSandbox: true,
Attempt: uint32(0),
ContainersToStart: []int{0, 1, 2},
ContainersToKill: getKillMap(basePod, baseStatus, []int{}),
},
},
"restart exited containers if RestartPolicy == Always": {
mutatePodFn: func(pod *v1.Pod) { pod.Spec.RestartPolicy = v1.RestartPolicyAlways },
mutateStatusFn: func(status *kubecontainer.PodStatus) {
// The first container completed, restart it,
status.ContainerStatuses[0].State = kubecontainer.ContainerStateExited
status.ContainerStatuses[0].ExitCode = 0
// The second container exited with failure, restart it,
status.ContainerStatuses[1].State = kubecontainer.ContainerStateExited
status.ContainerStatuses[1].ExitCode = 111
},
actions: podActions{
SandboxID: baseStatus.SandboxStatuses[0].Id,
ContainersToStart: []int{0, 1},
ContainersToKill: getKillMap(basePod, baseStatus, []int{}),
},
},
"restart failed containers if RestartPolicy == OnFailure": {
mutatePodFn: func(pod *v1.Pod) { pod.Spec.RestartPolicy = v1.RestartPolicyOnFailure },
mutateStatusFn: func(status *kubecontainer.PodStatus) {
// The first container completed, don't restart it,
status.ContainerStatuses[0].State = kubecontainer.ContainerStateExited
status.ContainerStatuses[0].ExitCode = 0
// The second container exited with failure, restart it,
status.ContainerStatuses[1].State = kubecontainer.ContainerStateExited
status.ContainerStatuses[1].ExitCode = 111
},
actions: podActions{
SandboxID: baseStatus.SandboxStatuses[0].Id,
ContainersToStart: []int{1},
ContainersToKill: getKillMap(basePod, baseStatus, []int{}),
},
},
"don't restart containers if RestartPolicy == Never": {
mutatePodFn: func(pod *v1.Pod) { pod.Spec.RestartPolicy = v1.RestartPolicyNever },
mutateStatusFn: func(status *kubecontainer.PodStatus) {
// Don't restart any containers.
status.ContainerStatuses[0].State = kubecontainer.ContainerStateExited
status.ContainerStatuses[0].ExitCode = 0
status.ContainerStatuses[1].State = kubecontainer.ContainerStateExited
status.ContainerStatuses[1].ExitCode = 111
},
actions: noAction,
},
"Kill pod and recreate everything if the pod sandbox is dead, and RestartPolicy == Always": {
mutatePodFn: func(pod *v1.Pod) { pod.Spec.RestartPolicy = v1.RestartPolicyAlways },
mutateStatusFn: func(status *kubecontainer.PodStatus) {
status.SandboxStatuses[0].State = runtimeapi.PodSandboxState_SANDBOX_NOTREADY
},
actions: podActions{
KillPod: true,
CreateSandbox: true,
SandboxID: baseStatus.SandboxStatuses[0].Id,
Attempt: uint32(1),
ContainersToStart: []int{0, 1, 2},
ContainersToKill: getKillMap(basePod, baseStatus, []int{}),
},
},
"Kill pod and recreate all containers (except for the succeeded one) if the pod sandbox is dead, and RestartPolicy == OnFailure": {
mutatePodFn: func(pod *v1.Pod) { pod.Spec.RestartPolicy = v1.RestartPolicyOnFailure },
mutateStatusFn: func(status *kubecontainer.PodStatus) {
status.SandboxStatuses[0].State = runtimeapi.PodSandboxState_SANDBOX_NOTREADY
status.ContainerStatuses[1].State = kubecontainer.ContainerStateExited
status.ContainerStatuses[1].ExitCode = 0
},
actions: podActions{
KillPod: true,
CreateSandbox: true,
SandboxID: baseStatus.SandboxStatuses[0].Id,
Attempt: uint32(1),
ContainersToStart: []int{0, 2},
ContainersToKill: getKillMap(basePod, baseStatus, []int{}),
},
},
"Kill and recreate the container if the container's spec changed": {
mutatePodFn: func(pod *v1.Pod) {
pod.Spec.RestartPolicy = v1.RestartPolicyAlways
},
mutateStatusFn: func(status *kubecontainer.PodStatus) {
status.ContainerStatuses[1].Hash = uint64(432423432)
},
actions: podActions{
SandboxID: baseStatus.SandboxStatuses[0].Id,
ContainersToKill: getKillMap(basePod, baseStatus, []int{1}),
ContainersToStart: []int{1},
},
// TODO: Add a test case for containers which failed the liveness
// check. Will need to fake the livessness check result.
},
} {
pod, status := makeBasePodAndStatus()
if test.mutatePodFn != nil {
test.mutatePodFn(pod)
}
if test.mutateStatusFn != nil {
test.mutateStatusFn(status)
}
actions := m.computePodActions(pod, status)
verifyActions(t, &test.actions, &actions, desc)
} }
} }
func getKillMap(pod *v1.Pod, status *kubecontainer.PodStatus, cIndexes []int) map[kubecontainer.ContainerID]containerToKillInfo {
m := map[kubecontainer.ContainerID]containerToKillInfo{}
for _, i := range cIndexes {
m[status.ContainerStatuses[i].ID] = containerToKillInfo{
container: &pod.Spec.Containers[i],
name: pod.Spec.Containers[i].Name,
}
}
return m
}
func verifyActions(t *testing.T, expected, actual *podActions, desc string) {
if actual.ContainersToKill != nil {
// Clear the message field since we don't need to verify the message.
for k, info := range actual.ContainersToKill {
info.message = ""
actual.ContainersToKill[k] = info
}
}
assert.Equal(t, expected, actual, desc)
}
func TestComputePodActionsWithInitContainers(t *testing.T) {
_, _, m, err := createTestRuntimeManager()
require.NoError(t, err)
// Createing a pair reference pod and status for the test cases to refer
// the specific fields.
basePod, baseStatus := makeBasePodAndStatusWithInitContainers()
noAction := podActions{
SandboxID: baseStatus.SandboxStatuses[0].Id,
ContainersToStart: []int{},
ContainersToKill: map[kubecontainer.ContainerID]containerToKillInfo{},
}
for desc, test := range map[string]struct {
mutatePodFn func(*v1.Pod)
mutateStatusFn func(*kubecontainer.PodStatus)
actions podActions
}{
"initialization completed; start all containers": {
actions: podActions{
SandboxID: baseStatus.SandboxStatuses[0].Id,
ContainersToStart: []int{0, 1, 2},
ContainersToKill: getKillMap(basePod, baseStatus, []int{}),
},
},
"initialization in progress; do nothing": {
mutatePodFn: func(pod *v1.Pod) { pod.Spec.RestartPolicy = v1.RestartPolicyAlways },
mutateStatusFn: func(status *kubecontainer.PodStatus) {
status.ContainerStatuses[2].State = kubecontainer.ContainerStateRunning
},
actions: noAction,
},
"Kill pod and restart the first init container if the pod sandbox is dead": {
mutatePodFn: func(pod *v1.Pod) { pod.Spec.RestartPolicy = v1.RestartPolicyAlways },
mutateStatusFn: func(status *kubecontainer.PodStatus) {
status.SandboxStatuses[0].State = runtimeapi.PodSandboxState_SANDBOX_NOTREADY
},
actions: podActions{
KillPod: true,
CreateSandbox: true,
SandboxID: baseStatus.SandboxStatuses[0].Id,
Attempt: uint32(1),
NextInitContainerToStart: &basePod.Spec.InitContainers[0],
ContainersToStart: []int{},
ContainersToKill: getKillMap(basePod, baseStatus, []int{}),
},
},
"initialization failed; restart the last init container if RestartPolicy == Always": {
mutatePodFn: func(pod *v1.Pod) { pod.Spec.RestartPolicy = v1.RestartPolicyAlways },
mutateStatusFn: func(status *kubecontainer.PodStatus) {
status.ContainerStatuses[2].ExitCode = 137
},
actions: podActions{
SandboxID: baseStatus.SandboxStatuses[0].Id,
NextInitContainerToStart: &basePod.Spec.InitContainers[2],
ContainersToStart: []int{},
ContainersToKill: getKillMap(basePod, baseStatus, []int{}),
},
},
"initialization failed; restart the last init container if RestartPolicy == OnFailure": {
mutatePodFn: func(pod *v1.Pod) { pod.Spec.RestartPolicy = v1.RestartPolicyOnFailure },
mutateStatusFn: func(status *kubecontainer.PodStatus) {
status.ContainerStatuses[2].ExitCode = 137
},
actions: podActions{
SandboxID: baseStatus.SandboxStatuses[0].Id,
NextInitContainerToStart: &basePod.Spec.InitContainers[2],
ContainersToStart: []int{},
ContainersToKill: getKillMap(basePod, baseStatus, []int{}),
},
},
"initialization failed; kill pod if RestartPolicy == Never": {
mutatePodFn: func(pod *v1.Pod) { pod.Spec.RestartPolicy = v1.RestartPolicyNever },
mutateStatusFn: func(status *kubecontainer.PodStatus) {
status.ContainerStatuses[2].ExitCode = 137
},
actions: podActions{
KillPod: true,
SandboxID: baseStatus.SandboxStatuses[0].Id,
ContainersToStart: []int{},
ContainersToKill: getKillMap(basePod, baseStatus, []int{}),
},
},
} {
pod, status := makeBasePodAndStatusWithInitContainers()
if test.mutatePodFn != nil {
test.mutatePodFn(pod)
}
if test.mutateStatusFn != nil {
test.mutateStatusFn(status)
}
actions := m.computePodActions(pod, status)
verifyActions(t, &test.actions, &actions, desc)
}
}
func makeBasePodAndStatusWithInitContainers() (*v1.Pod, *kubecontainer.PodStatus) {
pod, status := makeBasePodAndStatus()
pod.Spec.InitContainers = []v1.Container{
{
Name: "init1",
Image: "bar-image",
},
{
Name: "init2",
Image: "bar-image",
},
{
Name: "init3",
Image: "bar-image",
},
}
// Replace the original statuses of the containers with those for the init
// containers.
status.ContainerStatuses = []*kubecontainer.ContainerStatus{
{
ID: kubecontainer.ContainerID{ID: "initid1"},
Name: "init1", State: kubecontainer.ContainerStateExited,
Hash: kubecontainer.HashContainer(&pod.Spec.InitContainers[0]),
},
{
ID: kubecontainer.ContainerID{ID: "initid2"},
Name: "init2", State: kubecontainer.ContainerStateExited,
Hash: kubecontainer.HashContainer(&pod.Spec.InitContainers[0]),
},
{
ID: kubecontainer.ContainerID{ID: "initid3"},
Name: "init3", State: kubecontainer.ContainerStateExited,
Hash: kubecontainer.HashContainer(&pod.Spec.InitContainers[0]),
},
}
return pod, status
}