mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 20:24:09 +00:00
Merge pull request #31847 from feiskyer/syncpod
Automatic merge from submit-queue Kubelet: add SyncPod for new runtime API This PR adds implements of `SyncPod` for new runtime API. Note that init containers is not included, it will in another following PR. **DO-NOT-MERGE**. Based on #31322 and #31395, only last commit is for review. CC @yujuhong @Random-Liu and @kubernetes/sig-node @kubernetes/sig-rktnetes
This commit is contained in:
commit
dfe3a46672
@ -40,6 +40,7 @@ var (
|
|||||||
ErrVerifyNonRoot = errors.New("VerifyNonRootError")
|
ErrVerifyNonRoot = errors.New("VerifyNonRootError")
|
||||||
ErrRunInitContainer = errors.New("RunInitContainerError")
|
ErrRunInitContainer = errors.New("RunInitContainerError")
|
||||||
ErrCreatePodSandbox = errors.New("CreatePodSandboxError")
|
ErrCreatePodSandbox = errors.New("CreatePodSandboxError")
|
||||||
|
ErrConfigPodSandbox = errors.New("ConfigPodSandboxError")
|
||||||
ErrKillPodSandbox = errors.New("KillPodSandboxError")
|
ErrKillPodSandbox = errors.New("KillPodSandboxError")
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -59,6 +60,7 @@ const (
|
|||||||
TeardownNetwork SyncAction = "TeardownNetwork"
|
TeardownNetwork SyncAction = "TeardownNetwork"
|
||||||
InitContainer SyncAction = "InitContainer"
|
InitContainer SyncAction = "InitContainer"
|
||||||
CreatePodSandbox SyncAction = "CreatePodSandbox"
|
CreatePodSandbox SyncAction = "CreatePodSandbox"
|
||||||
|
ConfigPodSandbox SyncAction = "ConfigPodSandbox"
|
||||||
KillPodSandbox SyncAction = "KillPodSandbox"
|
KillPodSandbox SyncAction = "KillPodSandbox"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -33,11 +33,84 @@ import (
|
|||||||
runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
|
runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
|
||||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/events"
|
"k8s.io/kubernetes/pkg/kubelet/events"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/util/format"
|
||||||
"k8s.io/kubernetes/pkg/types"
|
"k8s.io/kubernetes/pkg/types"
|
||||||
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
|
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
|
||||||
"k8s.io/kubernetes/pkg/util/term"
|
"k8s.io/kubernetes/pkg/util/term"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// startContainer starts a container and returns a message indicates why it is failed on error.
|
||||||
|
// It starts the container through the following steps:
|
||||||
|
// * pull the image
|
||||||
|
// * create the container
|
||||||
|
// * start the container
|
||||||
|
// * run the post start lifecycle hooks (if applicable)
|
||||||
|
func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeApi.PodSandboxConfig, container *api.Container, pod *api.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []api.Secret, podIP string) (string, error) {
|
||||||
|
// Step 1: pull the image.
|
||||||
|
err, msg := m.imagePuller.EnsureImageExists(pod, container, pullSecrets)
|
||||||
|
if err != nil {
|
||||||
|
return msg, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 2: create the container.
|
||||||
|
ref, err := kubecontainer.GenerateContainerRef(pod, container)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Can't make a ref to pod %q, container %v: %v", format.Pod(pod), container.Name, err)
|
||||||
|
}
|
||||||
|
glog.V(4).Infof("Generating ref for container %s: %#v", container.Name, ref)
|
||||||
|
|
||||||
|
// For a new container, the RestartCount should be 0
|
||||||
|
restartCount := 0
|
||||||
|
containerStatus := podStatus.FindContainerStatusByName(container.Name)
|
||||||
|
if containerStatus != nil {
|
||||||
|
restartCount = containerStatus.RestartCount + 1
|
||||||
|
}
|
||||||
|
|
||||||
|
containerConfig, err := m.generateContainerConfig(container, pod, restartCount, podIP)
|
||||||
|
if err != nil {
|
||||||
|
m.recorder.Eventf(ref, api.EventTypeWarning, events.FailedToCreateContainer, "Failed to create container with error: %v", err)
|
||||||
|
return "Generate Container Config Failed", err
|
||||||
|
}
|
||||||
|
containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
|
||||||
|
if err != nil {
|
||||||
|
m.recorder.Eventf(ref, api.EventTypeWarning, events.FailedToCreateContainer, "Failed to create container with error: %v", err)
|
||||||
|
return "Create Container Failed", err
|
||||||
|
}
|
||||||
|
m.recorder.Eventf(ref, api.EventTypeNormal, events.CreatedContainer, "Created container with id %v", containerID)
|
||||||
|
if ref != nil {
|
||||||
|
m.containerRefManager.SetRef(kubecontainer.ContainerID{
|
||||||
|
Type: m.runtimeName,
|
||||||
|
ID: containerID,
|
||||||
|
}, ref)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 3: start the container.
|
||||||
|
err = m.runtimeService.StartContainer(containerID)
|
||||||
|
if err != nil {
|
||||||
|
m.recorder.Eventf(ref, api.EventTypeWarning, events.FailedToStartContainer,
|
||||||
|
"Failed to start container with id %v with error: %v", containerID, err)
|
||||||
|
return "Start Container Failed", err
|
||||||
|
}
|
||||||
|
m.recorder.Eventf(ref, api.EventTypeNormal, events.StartedContainer, "Started container with id %v", containerID)
|
||||||
|
|
||||||
|
// Step 4: execute the post start hook.
|
||||||
|
if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
|
||||||
|
kubeContainerID := kubecontainer.ContainerID{
|
||||||
|
Type: m.runtimeName,
|
||||||
|
ID: containerID,
|
||||||
|
}
|
||||||
|
msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart)
|
||||||
|
if handlerErr != nil {
|
||||||
|
err := fmt.Errorf("PostStart handler: %v", handlerErr)
|
||||||
|
m.generateContainerEvent(kubeContainerID, api.EventTypeWarning, events.FailedPostStartHook, msg)
|
||||||
|
m.killContainer(pod, kubeContainerID, container, "FailedPostStartHook", nil)
|
||||||
|
return "PostStart Hook Failed", err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
|
||||||
// getContainerLogsPath gets log path for container.
|
// getContainerLogsPath gets log path for container.
|
||||||
func getContainerLogsPath(containerName string, podUID types.UID) string {
|
func getContainerLogsPath(containerName string, podUID types.UID) string {
|
||||||
return path.Join(podLogsRootDirectory, string(podUID), fmt.Sprintf("%s.log", containerName))
|
return path.Join(podLogsRootDirectory, string(podUID), fmt.Sprintf("%s.log", containerName))
|
||||||
@ -370,12 +443,11 @@ func (m *kubeGenericRuntimeManager) killContainer(pod *api.Pod, containerID kube
|
|||||||
if pod != nil && containerSpec != nil && containerSpec.Lifecycle != nil && containerSpec.Lifecycle.PreStop != nil {
|
if pod != nil && containerSpec != nil && containerSpec.Lifecycle != nil && containerSpec.Lifecycle.PreStop != nil {
|
||||||
gracePeriod = gracePeriod - m.executePreStopHook(pod, containerID, containerSpec, gracePeriod)
|
gracePeriod = gracePeriod - m.executePreStopHook(pod, containerID, containerSpec, gracePeriod)
|
||||||
}
|
}
|
||||||
if gracePeriodOverride == nil {
|
// always give containers a minimal shutdown window to avoid unnecessary SIGKILLs
|
||||||
// always give containers a minimal shutdown window to avoid unnecessary SIGKILLs
|
if gracePeriod < minimumGracePeriodInSeconds {
|
||||||
if gracePeriod < minimumGracePeriodInSeconds {
|
gracePeriod = minimumGracePeriodInSeconds
|
||||||
gracePeriod = minimumGracePeriodInSeconds
|
}
|
||||||
}
|
if gracePeriodOverride != nil {
|
||||||
} else {
|
|
||||||
gracePeriod = *gracePeriodOverride
|
gracePeriod = *gracePeriodOverride
|
||||||
glog.V(3).Infof("Killing container %q, but using %d second grace period override", containerID, gracePeriod)
|
glog.V(3).Infof("Killing container %q, but using %d second grace period override", containerID, gracePeriod)
|
||||||
}
|
}
|
||||||
|
@ -31,6 +31,7 @@ import (
|
|||||||
internalApi "k8s.io/kubernetes/pkg/kubelet/api"
|
internalApi "k8s.io/kubernetes/pkg/kubelet/api"
|
||||||
runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
|
runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
|
||||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/events"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/images"
|
"k8s.io/kubernetes/pkg/kubelet/images"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/network"
|
"k8s.io/kubernetes/pkg/kubelet/network"
|
||||||
@ -39,6 +40,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/kubelet/util/format"
|
"k8s.io/kubernetes/pkg/kubelet/util/format"
|
||||||
kubetypes "k8s.io/kubernetes/pkg/types"
|
kubetypes "k8s.io/kubernetes/pkg/types"
|
||||||
"k8s.io/kubernetes/pkg/util/flowcontrol"
|
"k8s.io/kubernetes/pkg/util/flowcontrol"
|
||||||
|
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -306,15 +308,336 @@ func (m *kubeGenericRuntimeManager) GetPods(all bool) ([]*kubecontainer.Pod, err
|
|||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SyncPod syncs the running pod into the desired pod.
|
// containerToKillInfo contains neccessary information to kill a container.
|
||||||
func (m *kubeGenericRuntimeManager) SyncPod(pod *api.Pod, _ api.PodStatus,
|
type containerToKillInfo struct {
|
||||||
podStatus *kubecontainer.PodStatus, pullSecrets []api.Secret,
|
// The spec of the container.
|
||||||
backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
|
container *api.Container
|
||||||
result.Fail(fmt.Errorf("not implemented"))
|
// The name of the container.
|
||||||
|
name string
|
||||||
|
// The message indicates why the container will be killed.
|
||||||
|
message string
|
||||||
|
}
|
||||||
|
|
||||||
|
// podContainerSpecChanges keeps information on changes that need to happen for a pod.
|
||||||
|
type podContainerSpecChanges struct {
|
||||||
|
// Whether need to create a new sandbox.
|
||||||
|
CreateSandbox bool
|
||||||
|
// The id of existing sandbox. It is used for starting containers in ContainersToStart.
|
||||||
|
SandboxID string
|
||||||
|
// The attempt number of creating sandboxes for the pod.
|
||||||
|
Attempt uint32
|
||||||
|
|
||||||
|
// ContainersToStart keeps a map of containers that need to be started, note that
|
||||||
|
// the key is index of the container inside pod.Spec.Containers, while
|
||||||
|
// the value is a message indicates why the container needs to start.
|
||||||
|
ContainersToStart map[int]string
|
||||||
|
// ContainersToKeep keeps a map of containers that need to be kept as is, note that
|
||||||
|
// the key is the container ID of the container, while
|
||||||
|
// the value is index of the container inside pod.Spec.Containers.
|
||||||
|
ContainersToKeep map[kubecontainer.ContainerID]int
|
||||||
|
// ContainersToKill keeps a map of containers that need to be killed, note that
|
||||||
|
// the key is the container ID of the container, while
|
||||||
|
// the value contains neccessary information to kill a container.
|
||||||
|
ContainersToKill map[kubecontainer.ContainerID]containerToKillInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
// podSandboxChanged checks whether the spec of the pod is changed and returns
|
||||||
|
// (changed, new attempt, original sandboxID if exist).
|
||||||
|
func (m *kubeGenericRuntimeManager) podSandboxChanged(pod *api.Pod, podStatus *kubecontainer.PodStatus) (changed bool, attempt uint32, sandboxID string) {
|
||||||
|
if len(podStatus.SandboxStatuses) == 0 {
|
||||||
|
return true, 0, ""
|
||||||
|
}
|
||||||
|
|
||||||
|
readySandboxCount := 0
|
||||||
|
for _, s := range podStatus.SandboxStatuses {
|
||||||
|
if s.GetState() == runtimeApi.PodSandBoxState_READY {
|
||||||
|
readySandboxCount++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Needs to create a new sandbox when readySandboxCount > 1 or the ready sandbox is not the latest one.
|
||||||
|
sandboxStatus := podStatus.SandboxStatuses[0]
|
||||||
|
if readySandboxCount > 1 || sandboxStatus.GetState() != runtimeApi.PodSandBoxState_READY {
|
||||||
|
return true, sandboxStatus.Metadata.GetAttempt() + 1, sandboxStatus.GetId()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Needs to create a new sandbox when network namespace changed.
|
||||||
|
if sandboxStatus.Linux != nil && sandboxStatus.Linux.Namespaces.Options != nil &&
|
||||||
|
sandboxStatus.Linux.Namespaces.Options.GetHostNetwork() != kubecontainer.IsHostNetworkPod(pod) {
|
||||||
|
return true, sandboxStatus.Metadata.GetAttempt() + 1, ""
|
||||||
|
}
|
||||||
|
|
||||||
|
return false, sandboxStatus.Metadata.GetAttempt(), sandboxStatus.GetId()
|
||||||
|
}
|
||||||
|
|
||||||
|
// computePodContainerChanges checks whether the pod spec has changed and returns the changes if true.
|
||||||
|
func (m *kubeGenericRuntimeManager) computePodContainerChanges(pod *api.Pod, podStatus *kubecontainer.PodStatus) podContainerSpecChanges {
|
||||||
|
glog.V(5).Infof("Syncing Pod %q: %+v", format.Pod(pod), pod)
|
||||||
|
|
||||||
|
sandboxChanged, attempt, sandboxID := m.podSandboxChanged(pod, podStatus)
|
||||||
|
changes := podContainerSpecChanges{
|
||||||
|
CreateSandbox: sandboxChanged,
|
||||||
|
SandboxID: sandboxID,
|
||||||
|
Attempt: attempt,
|
||||||
|
ContainersToStart: make(map[int]string),
|
||||||
|
ContainersToKeep: make(map[kubecontainer.ContainerID]int),
|
||||||
|
ContainersToKill: make(map[kubecontainer.ContainerID]containerToKillInfo),
|
||||||
|
}
|
||||||
|
|
||||||
|
for index, container := range pod.Spec.Containers {
|
||||||
|
if sandboxChanged {
|
||||||
|
message := fmt.Sprintf("Container %+v's pod sandbox is dead, the container will be recreated.", container)
|
||||||
|
glog.Info(message)
|
||||||
|
changes.ContainersToStart[index] = message
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
containerStatus := podStatus.FindContainerStatusByName(container.Name)
|
||||||
|
if containerStatus == nil || containerStatus.State != kubecontainer.ContainerStateRunning {
|
||||||
|
if kubecontainer.ShouldContainerBeRestarted(&container, pod, podStatus) {
|
||||||
|
message := fmt.Sprintf("Container %+v is dead, but RestartPolicy says that we should restart it.", container)
|
||||||
|
glog.Info(message)
|
||||||
|
changes.ContainersToStart[index] = message
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
expectedHash := kubecontainer.HashContainer(&container)
|
||||||
|
containerChanged := containerStatus.Hash != expectedHash
|
||||||
|
if containerChanged {
|
||||||
|
message := fmt.Sprintf("Pod %q container %q hash changed (%d vs %d), it will be killed and re-created.",
|
||||||
|
pod.Name, container.Name, containerStatus.Hash, expectedHash)
|
||||||
|
glog.Info(message)
|
||||||
|
changes.ContainersToStart[index] = message
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
liveness, found := m.livenessManager.Get(containerStatus.ID)
|
||||||
|
if !found || liveness == proberesults.Success {
|
||||||
|
changes.ContainersToKeep[containerStatus.ID] = index
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if pod.Spec.RestartPolicy != api.RestartPolicyNever {
|
||||||
|
message := fmt.Sprintf("pod %q container %q is unhealthy, it will be killed and re-created.", format.Pod(pod), container.Name)
|
||||||
|
glog.Info(message)
|
||||||
|
changes.ContainersToStart[index] = message
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// compute containers that to be killed
|
||||||
|
runningContainerStatues := podStatus.GetRunningContainerStatuses()
|
||||||
|
for _, containerStatus := range runningContainerStatues {
|
||||||
|
if _, keep := changes.ContainersToKeep[containerStatus.ID]; !keep {
|
||||||
|
var podContainer *api.Container
|
||||||
|
var killMessage string
|
||||||
|
for i, c := range pod.Spec.Containers {
|
||||||
|
if c.Name == containerStatus.Name {
|
||||||
|
podContainer = &pod.Spec.Containers[i]
|
||||||
|
killMessage = changes.ContainersToStart[i]
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
changes.ContainersToKill[containerStatus.ID] = containerToKillInfo{
|
||||||
|
name: containerStatus.Name,
|
||||||
|
container: podContainer,
|
||||||
|
message: killMessage,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return changes
|
||||||
|
}
|
||||||
|
|
||||||
|
// SyncPod syncs the running pod into the desired pod by executing following steps:
|
||||||
|
//
|
||||||
|
// 1. Compute sandbox and container changes.
|
||||||
|
// 2. Kill pod sandbox if necessary.
|
||||||
|
// 3. Kill any containers that should not be running.
|
||||||
|
// 4. Create sandbox if necessary.
|
||||||
|
// 5. Create necessary containers
|
||||||
|
//
|
||||||
|
// TODO: support init containers in SyncPod.
|
||||||
|
func (m *kubeGenericRuntimeManager) SyncPod(pod *api.Pod, _ api.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []api.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
|
||||||
|
// Step 1: Compute sandbox and container changes.
|
||||||
|
podContainerChanges := m.computePodContainerChanges(pod, podStatus)
|
||||||
|
glog.V(3).Infof("computePodContainerChanges got %+v for pod %q", podContainerChanges, format.Pod(pod))
|
||||||
|
if podContainerChanges.CreateSandbox {
|
||||||
|
ref, err := api.GetReference(pod)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), err)
|
||||||
|
}
|
||||||
|
if podContainerChanges.SandboxID != "" {
|
||||||
|
m.recorder.Eventf(ref, api.EventTypeNormal, "SandboxChanged", "Pod sandbox changed, it will be killed and re-created.")
|
||||||
|
} else {
|
||||||
|
m.recorder.Eventf(ref, api.EventTypeNormal, "SandboxReceived", "Pod sandbox received, it will be created.")
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 2: Kill the pod if the sandbox has changed.
|
||||||
|
if podContainerChanges.CreateSandbox || (len(podContainerChanges.ContainersToKeep) == 0 && len(podContainerChanges.ContainersToStart) == 0) {
|
||||||
|
if len(podContainerChanges.ContainersToKeep) == 0 && len(podContainerChanges.ContainersToStart) == 0 {
|
||||||
|
glog.V(4).Infof("Stopping PodSandbox for %q because all other containers are dead.", format.Pod(pod))
|
||||||
|
} else {
|
||||||
|
glog.V(4).Infof("Stopping PodSandbox for %q, will start new one", format.Pod(pod))
|
||||||
|
}
|
||||||
|
|
||||||
|
killResult := m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(podStatus), nil)
|
||||||
|
result.AddPodSyncResult(killResult)
|
||||||
|
if killResult.Error() != nil {
|
||||||
|
glog.Errorf("killPodWithSyncResult failed: %v", killResult.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Step 3: kill any running containers in this pod which are not to keep.
|
||||||
|
for containerID, containerInfo := range podContainerChanges.ContainersToKill {
|
||||||
|
glog.V(3).Infof("Killing unwanted container %q(id=%q) for pod %q", containerInfo.name, containerID, format.Pod(pod))
|
||||||
|
killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name)
|
||||||
|
result.AddSyncResult(killContainerResult)
|
||||||
|
if err := m.killContainer(pod, containerID, containerInfo.container, containerInfo.message, nil); err != nil {
|
||||||
|
killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
|
||||||
|
glog.Errorf("killContainer %q(id=%q) for pod %q failed: %v", containerInfo.name, containerID, format.Pod(pod), err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// We pass the value of the podIP down to generatePodSandboxConfig and
|
||||||
|
// generateContainerConfig, which in turn passes it to various other
|
||||||
|
// functions, in order to facilitate functionality that requires this
|
||||||
|
// value (hosts file and downward API) and avoid races determining
|
||||||
|
// the pod IP in cases where a container requires restart but the
|
||||||
|
// podIP isn't in the status manager yet.
|
||||||
|
//
|
||||||
|
// We default to the IP in the passed-in pod status, and overwrite it if the
|
||||||
|
// sandbox needs to be (re)started.
|
||||||
|
podIP := ""
|
||||||
|
if podStatus != nil {
|
||||||
|
podIP = podStatus.IP
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 4: Create a sandbox for the pod if necessary.
|
||||||
|
podSandboxID := podContainerChanges.SandboxID
|
||||||
|
if podContainerChanges.CreateSandbox && len(podContainerChanges.ContainersToStart) > 0 {
|
||||||
|
var msg string
|
||||||
|
var err error
|
||||||
|
|
||||||
|
glog.V(4).Infof("Creating sandbox for pod %q", format.Pod(pod))
|
||||||
|
createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod))
|
||||||
|
result.AddSyncResult(createSandboxResult)
|
||||||
|
podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
|
||||||
|
if err != nil {
|
||||||
|
createSandboxResult.Fail(kubecontainer.ErrCreatePodSandbox, msg)
|
||||||
|
glog.Errorf("createPodSandbox for pod %q failed: %v", format.Pod(pod), err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
setupNetworkResult := kubecontainer.NewSyncResult(kubecontainer.SetupNetwork, podSandboxID)
|
||||||
|
result.AddSyncResult(setupNetworkResult)
|
||||||
|
if !kubecontainer.IsHostNetworkPod(pod) {
|
||||||
|
glog.V(3).Infof("Calling network plugin %s to setup pod for %s", m.networkPlugin.Name(), format.Pod(pod))
|
||||||
|
// Setup pod network plugin with sandbox id
|
||||||
|
// TODO: rename the last param to sandboxID
|
||||||
|
err = m.networkPlugin.SetUpPod(pod.Namespace, pod.Name, kubecontainer.ContainerID{
|
||||||
|
Type: m.runtimeName,
|
||||||
|
ID: podSandboxID,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
message := fmt.Sprintf("Failed to setup network for pod %q using network plugins %q: %v", format.Pod(pod), m.networkPlugin.Name(), err)
|
||||||
|
setupNetworkResult.Fail(kubecontainer.ErrSetupNetwork, message)
|
||||||
|
glog.Error(message)
|
||||||
|
|
||||||
|
killPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.KillPodSandbox, format.Pod(pod))
|
||||||
|
result.AddSyncResult(killPodSandboxResult)
|
||||||
|
if err := m.runtimeService.StopPodSandbox(podSandboxID); err != nil {
|
||||||
|
killPodSandboxResult.Fail(kubecontainer.ErrKillPodSandbox, err.Error())
|
||||||
|
glog.Errorf("Kill sandbox %q failed for pod %q: %v", podSandboxID, format.Pod(pod), err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
podSandboxStatus, err := m.runtimeService.PodSandboxStatus(podSandboxID)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Failed to get pod sandbox status: %v; Skipping pod %q", err, format.Pod(pod))
|
||||||
|
result.Fail(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Overwrite the podIP passed in the pod status, since we just started the infra container.
|
||||||
|
podIP = m.determinePodSandboxIP(pod.Namespace, pod.Name, podSandboxStatus)
|
||||||
|
glog.V(4).Infof("Determined the ip %q for pod %q after sandbox changed", podIP, format.Pod(pod))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get podSandboxConfig for containers to start.
|
||||||
|
configPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID)
|
||||||
|
result.AddSyncResult(configPodSandboxResult)
|
||||||
|
podSandboxConfig, err := m.generatePodSandboxConfig(pod, podContainerChanges.Attempt)
|
||||||
|
if err != nil {
|
||||||
|
message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err)
|
||||||
|
glog.Error(message)
|
||||||
|
configPodSandboxResult.Fail(kubecontainer.ErrConfigPodSandbox, message)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 5: start containers in podContainerChanges.ContainersToStart.
|
||||||
|
for idx := range podContainerChanges.ContainersToStart {
|
||||||
|
container := &pod.Spec.Containers[idx]
|
||||||
|
startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name)
|
||||||
|
result.AddSyncResult(startContainerResult)
|
||||||
|
|
||||||
|
isInBackOff, msg, err := m.doBackOff(pod, container, podStatus, backOff)
|
||||||
|
if isInBackOff {
|
||||||
|
startContainerResult.Fail(err, msg)
|
||||||
|
glog.V(4).Infof("Backing Off restarting container %+v in pod %v", container, format.Pod(pod))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
glog.V(4).Infof("Creating container %+v in pod %v", container, format.Pod(pod))
|
||||||
|
if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP); err != nil {
|
||||||
|
startContainerResult.Fail(err, msg)
|
||||||
|
utilruntime.HandleError(fmt.Errorf("container start failed: %v: %s", err, msg))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If a container is still in backoff, the function will return a brief backoff error and
|
||||||
|
// a detailed error message.
|
||||||
|
func (m *kubeGenericRuntimeManager) doBackOff(pod *api.Pod, container *api.Container, podStatus *kubecontainer.PodStatus, backOff *flowcontrol.Backoff) (bool, string, error) {
|
||||||
|
var cStatus *kubecontainer.ContainerStatus
|
||||||
|
for _, c := range podStatus.ContainerStatuses {
|
||||||
|
if c.Name == container.Name && c.State == kubecontainer.ContainerStateExited {
|
||||||
|
cStatus = c
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if cStatus == nil {
|
||||||
|
return false, "", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
glog.Infof("checking backoff for container %q in pod %q", container.Name, format.Pod(pod))
|
||||||
|
// Use the finished time of the latest exited container as the start point to calculate whether to do back-off.
|
||||||
|
ts := cStatus.FinishedAt
|
||||||
|
// backOff requires a unique id to identify the container
|
||||||
|
stableName := fmt.Sprintf("%s_%s_%s_%s", pod.Name, pod.Namespace, string(pod.UID), container.Name)
|
||||||
|
if backOff.IsInBackOffSince(stableName, ts) {
|
||||||
|
if ref, err := kubecontainer.GenerateContainerRef(pod, container); err == nil {
|
||||||
|
m.recorder.Eventf(ref, api.EventTypeWarning, events.BackOffStartContainer, "Back-off restarting failed container")
|
||||||
|
}
|
||||||
|
err := fmt.Errorf("Back-off %s restarting failed container=%s pod=%s", backOff.Get(stableName), container.Name, format.Pod(pod))
|
||||||
|
glog.Infof("%s", err.Error())
|
||||||
|
return true, err.Error(), kubecontainer.ErrCrashLoopBackOff
|
||||||
|
}
|
||||||
|
|
||||||
|
backOff.Next(stableName, ts)
|
||||||
|
return false, "", nil
|
||||||
|
}
|
||||||
|
|
||||||
// KillPod kills all the containers of a pod. Pod may be nil, running pod must not be.
|
// KillPod kills all the containers of a pod. Pod may be nil, running pod must not be.
|
||||||
// gracePeriodOverride if specified allows the caller to override the pod default grace period.
|
// gracePeriodOverride if specified allows the caller to override the pod default grace period.
|
||||||
// only hard kill paths are allowed to specify a gracePeriodOverride in the kubelet in order to not corrupt user data.
|
// only hard kill paths are allowed to specify a gracePeriodOverride in the kubelet in order to not corrupt user data.
|
||||||
@ -409,11 +732,18 @@ func (m *kubeGenericRuntimeManager) GetPodStatus(uid kubetypes.UID, name, namesp
|
|||||||
// Anyhow, we only promised "best-effort" restart count reporting, we can just ignore
|
// Anyhow, we only promised "best-effort" restart count reporting, we can just ignore
|
||||||
// these limitations now.
|
// these limitations now.
|
||||||
// TODO: move this comment to SyncPod.
|
// TODO: move this comment to SyncPod.
|
||||||
podFullName := kubecontainer.BuildPodFullName(name, namespace)
|
|
||||||
podSandboxIDs, err := m.getSandboxIDByPodUID(string(uid), nil)
|
podSandboxIDs, err := m.getSandboxIDByPodUID(string(uid), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
podFullName := format.Pod(&api.Pod{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: name,
|
||||||
|
Namespace: namespace,
|
||||||
|
UID: uid,
|
||||||
|
},
|
||||||
|
})
|
||||||
glog.V(4).Infof("getSandboxIDByPodUID got sandbox IDs %q for pod %q(UID:%q)", podSandboxIDs, podFullName, string(uid))
|
glog.V(4).Infof("getSandboxIDByPodUID got sandbox IDs %q for pod %q(UID:%q)", podSandboxIDs, podFullName, string(uid))
|
||||||
|
|
||||||
sandboxStatuses := make([]*runtimeApi.PodSandboxStatus, len(podSandboxIDs))
|
sandboxStatuses := make([]*runtimeApi.PodSandboxStatus, len(podSandboxIDs))
|
||||||
@ -432,12 +762,12 @@ func (m *kubeGenericRuntimeManager) GetPodStatus(uid kubetypes.UID, name, namesp
|
|||||||
podIP = m.determinePodSandboxIP(namespace, name, podSandboxStatus)
|
podIP = m.determinePodSandboxIP(namespace, name, podSandboxStatus)
|
||||||
}
|
}
|
||||||
|
|
||||||
containerStatus, err := m.getKubeletContainerStatuses(podSandboxID)
|
statuses, err := m.getKubeletContainerStatuses(podSandboxID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("getKubeletContainerStatuses for sandbox %s failed: %v", podSandboxID, err)
|
glog.Errorf("getKubeletContainerStatuses for sandbox %s failed: %v", podSandboxID, err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
containerStatuses = append(containerStatuses, containerStatus...)
|
containerStatuses = append(containerStatuses, statuses...)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &kubecontainer.PodStatus{
|
return &kubecontainer.PodStatus{
|
||||||
@ -492,7 +822,14 @@ func (m *kubeGenericRuntimeManager) GarbageCollect(gcPolicy kubecontainer.Contai
|
|||||||
|
|
||||||
// GetPodContainerID gets pod sandbox ID
|
// GetPodContainerID gets pod sandbox ID
|
||||||
func (m *kubeGenericRuntimeManager) GetPodContainerID(pod *kubecontainer.Pod) (kubecontainer.ContainerID, error) {
|
func (m *kubeGenericRuntimeManager) GetPodContainerID(pod *kubecontainer.Pod) (kubecontainer.ContainerID, error) {
|
||||||
podFullName := kubecontainer.BuildPodFullName(pod.Name, pod.Namespace)
|
// TODO: add a format function for kubecontainer.Pod
|
||||||
|
podFullName := format.Pod(&api.Pod{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: pod.Name,
|
||||||
|
Namespace: pod.Namespace,
|
||||||
|
UID: pod.ID,
|
||||||
|
},
|
||||||
|
})
|
||||||
if len(pod.Sandboxes) == 0 {
|
if len(pod.Sandboxes) == 0 {
|
||||||
glog.Errorf("No sandboxes are found for pod %q", podFullName)
|
glog.Errorf("No sandboxes are found for pod %q", podFullName)
|
||||||
return kubecontainer.ContainerID{}, fmt.Errorf("sandboxes for pod %q not found", podFullName)
|
return kubecontainer.ContainerID{}, fmt.Errorf("sandboxes for pod %q not found", podFullName)
|
||||||
|
@ -20,6 +20,7 @@ import (
|
|||||||
"reflect"
|
"reflect"
|
||||||
"sort"
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
@ -31,6 +32,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/kubelet/network"
|
"k8s.io/kubernetes/pkg/kubelet/network"
|
||||||
nettest "k8s.io/kubernetes/pkg/kubelet/network/testing"
|
nettest "k8s.io/kubernetes/pkg/kubelet/network/testing"
|
||||||
kubetypes "k8s.io/kubernetes/pkg/types"
|
kubetypes "k8s.io/kubernetes/pkg/types"
|
||||||
|
"k8s.io/kubernetes/pkg/util/flowcontrol"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -70,7 +72,7 @@ func makeAndSetFakePod(m *kubeGenericRuntimeManager, fakeRuntime *apitest.FakeRu
|
|||||||
}
|
}
|
||||||
|
|
||||||
func makeFakePodSandbox(m *kubeGenericRuntimeManager, pod *api.Pod, createdAt int64) (*apitest.FakePodSandbox, error) {
|
func makeFakePodSandbox(m *kubeGenericRuntimeManager, pod *api.Pod, createdAt int64) (*apitest.FakePodSandbox, error) {
|
||||||
config, err := m.generatePodSandboxConfig(pod, "", 0)
|
config, err := m.generatePodSandboxConfig(pod, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -114,7 +116,7 @@ func makeFakeContainer(m *kubeGenericRuntimeManager, pod *api.Pod, container api
|
|||||||
}
|
}
|
||||||
|
|
||||||
func makeFakeContainers(m *kubeGenericRuntimeManager, pod *api.Pod, containers []api.Container, createdAt int64) ([]*apitest.FakeContainer, error) {
|
func makeFakeContainers(m *kubeGenericRuntimeManager, pod *api.Pod, containers []api.Container, createdAt int64) ([]*apitest.FakeContainer, error) {
|
||||||
sandboxConfig, err := m.generatePodSandboxConfig(pod, "", 0)
|
sandboxConfig, err := m.generatePodSandboxConfig(pod, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -441,3 +443,44 @@ func TestKillPod(t *testing.T) {
|
|||||||
assert.Equal(t, runtimeApi.ContainerState_EXITED, c.GetState())
|
assert.Equal(t, runtimeApi.ContainerState_EXITED, c.GetState())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSyncPod(t *testing.T) {
|
||||||
|
fakeRuntime, fakeImage, m, err := createTestRuntimeManager()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
containers := []api.Container{
|
||||||
|
{
|
||||||
|
Name: "foo1",
|
||||||
|
Image: "busybox",
|
||||||
|
ImagePullPolicy: api.PullIfNotPresent,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "foo2",
|
||||||
|
Image: "alpine",
|
||||||
|
ImagePullPolicy: api.PullIfNotPresent,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
pod := &api.Pod{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
UID: "12345678",
|
||||||
|
Name: "foo",
|
||||||
|
Namespace: "new",
|
||||||
|
},
|
||||||
|
Spec: api.PodSpec{
|
||||||
|
Containers: containers,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
backOff := flowcontrol.NewBackOff(time.Second, time.Minute)
|
||||||
|
result := m.SyncPod(pod, api.PodStatus{}, &kubecontainer.PodStatus{}, []api.Secret{}, backOff)
|
||||||
|
assert.NoError(t, result.Error())
|
||||||
|
assert.Equal(t, 2, len(fakeRuntime.Containers))
|
||||||
|
assert.Equal(t, 2, len(fakeImage.Images))
|
||||||
|
assert.Equal(t, 1, len(fakeRuntime.Sandboxes))
|
||||||
|
for _, sandbox := range fakeRuntime.Sandboxes {
|
||||||
|
assert.Equal(t, runtimeApi.PodSandBoxState_READY, sandbox.GetState())
|
||||||
|
}
|
||||||
|
for _, c := range fakeRuntime.Containers {
|
||||||
|
assert.Equal(t, runtimeApi.ContainerState_RUNNING, c.GetState())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||||||
package kuberuntime
|
package kuberuntime
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"sort"
|
"sort"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
@ -25,10 +26,30 @@ import (
|
|||||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/network"
|
"k8s.io/kubernetes/pkg/kubelet/network"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/types"
|
"k8s.io/kubernetes/pkg/kubelet/types"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/util/format"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// createPodSandbox creates a pod sandbox and returns (podSandBoxID, message, error).
|
||||||
|
func (m *kubeGenericRuntimeManager) createPodSandbox(pod *api.Pod, attempt uint32) (string, string, error) {
|
||||||
|
podSandboxConfig, err := m.generatePodSandboxConfig(pod, attempt)
|
||||||
|
if err != nil {
|
||||||
|
message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err)
|
||||||
|
glog.Error(message)
|
||||||
|
return "", message, err
|
||||||
|
}
|
||||||
|
|
||||||
|
podSandBoxID, err := m.runtimeService.RunPodSandbox(podSandboxConfig)
|
||||||
|
if err != nil {
|
||||||
|
message := fmt.Sprintf("CreatePodSandbox for pod %q failed: %v", format.Pod(pod), err)
|
||||||
|
glog.Error(message)
|
||||||
|
return "", message, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return podSandBoxID, "", nil
|
||||||
|
}
|
||||||
|
|
||||||
// generatePodSandboxConfig generates pod sandbox config from api.Pod.
|
// generatePodSandboxConfig generates pod sandbox config from api.Pod.
|
||||||
func (m *kubeGenericRuntimeManager) generatePodSandboxConfig(pod *api.Pod, podIP string, attempt uint32) (*runtimeApi.PodSandboxConfig, error) {
|
func (m *kubeGenericRuntimeManager) generatePodSandboxConfig(pod *api.Pod, attempt uint32) (*runtimeApi.PodSandboxConfig, error) {
|
||||||
// TODO: deprecating podsandbox resource requirements in favor of the pod level cgroup
|
// TODO: deprecating podsandbox resource requirements in favor of the pod level cgroup
|
||||||
// Refer https://github.com/kubernetes/kubernetes/issues/29871
|
// Refer https://github.com/kubernetes/kubernetes/issues/29871
|
||||||
podUID := string(pod.UID)
|
podUID := string(pod.UID)
|
||||||
@ -63,7 +84,8 @@ func (m *kubeGenericRuntimeManager) generatePodSandboxConfig(pod *api.Pod, podIP
|
|||||||
cgroupParent := ""
|
cgroupParent := ""
|
||||||
portMappings := []*runtimeApi.PortMapping{}
|
portMappings := []*runtimeApi.PortMapping{}
|
||||||
for _, c := range pod.Spec.Containers {
|
for _, c := range pod.Spec.Containers {
|
||||||
opts, err := m.runtimeHelper.GenerateRunContainerOptions(pod, &c, podIP)
|
// TODO: use a separate interface to only generate portmappings
|
||||||
|
opts, err := m.runtimeHelper.GenerateRunContainerOptions(pod, &c, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user