From d65309399a228bd0c085f5e0d1ce93d558aebfaa Mon Sep 17 00:00:00 2001 From: Pengfei Ni Date: Thu, 1 Sep 2016 07:44:07 +0800 Subject: [PATCH] Kubelet: add SyncPod for new runtime API --- pkg/kubelet/container/sync_result.go | 2 + .../kuberuntime/kuberuntime_container.go | 84 ++++- .../kuberuntime/kuberuntime_manager.go | 355 +++++++++++++++++- .../kuberuntime/kuberuntime_manager_test.go | 47 ++- .../kuberuntime/kuberuntime_sandbox.go | 26 +- 5 files changed, 495 insertions(+), 19 deletions(-) diff --git a/pkg/kubelet/container/sync_result.go b/pkg/kubelet/container/sync_result.go index 61e115a9826..c8eeda8ce86 100644 --- a/pkg/kubelet/container/sync_result.go +++ b/pkg/kubelet/container/sync_result.go @@ -40,6 +40,7 @@ var ( ErrVerifyNonRoot = errors.New("VerifyNonRootError") ErrRunInitContainer = errors.New("RunInitContainerError") ErrCreatePodSandbox = errors.New("CreatePodSandboxError") + ErrConfigPodSandbox = errors.New("ConfigPodSandboxError") ErrKillPodSandbox = errors.New("KillPodSandboxError") ) @@ -59,6 +60,7 @@ const ( TeardownNetwork SyncAction = "TeardownNetwork" InitContainer SyncAction = "InitContainer" CreatePodSandbox SyncAction = "CreatePodSandbox" + ConfigPodSandbox SyncAction = "ConfigPodSandbox" KillPodSandbox SyncAction = "KillPodSandbox" ) diff --git a/pkg/kubelet/kuberuntime/kuberuntime_container.go b/pkg/kubelet/kuberuntime/kuberuntime_container.go index 48d4ec76885..ff9ca90ad1b 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_container.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_container.go @@ -33,11 +33,84 @@ import ( runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/events" + "k8s.io/kubernetes/pkg/kubelet/util/format" "k8s.io/kubernetes/pkg/types" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/term" ) +// startContainer starts a container and returns a message indicates why it is failed on error. +// It starts the container through the following steps: +// * pull the image +// * create the container +// * start the container +// * run the post start lifecycle hooks (if applicable) +func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeApi.PodSandboxConfig, container *api.Container, pod *api.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []api.Secret, podIP string) (string, error) { + // Step 1: pull the image. + err, msg := m.imagePuller.EnsureImageExists(pod, container, pullSecrets) + if err != nil { + return msg, err + } + + // Step 2: create the container. + ref, err := kubecontainer.GenerateContainerRef(pod, container) + if err != nil { + glog.Errorf("Can't make a ref to pod %q, container %v: %v", format.Pod(pod), container.Name, err) + } + glog.V(4).Infof("Generating ref for container %s: %#v", container.Name, ref) + + // For a new container, the RestartCount should be 0 + restartCount := 0 + containerStatus := podStatus.FindContainerStatusByName(container.Name) + if containerStatus != nil { + restartCount = containerStatus.RestartCount + 1 + } + + containerConfig, err := m.generateContainerConfig(container, pod, restartCount, podIP) + if err != nil { + m.recorder.Eventf(ref, api.EventTypeWarning, events.FailedToCreateContainer, "Failed to create container with error: %v", err) + return "Generate Container Config Failed", err + } + containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig) + if err != nil { + m.recorder.Eventf(ref, api.EventTypeWarning, events.FailedToCreateContainer, "Failed to create container with error: %v", err) + return "Create Container Failed", err + } + m.recorder.Eventf(ref, api.EventTypeNormal, events.CreatedContainer, "Created container with id %v", containerID) + if ref != nil { + m.containerRefManager.SetRef(kubecontainer.ContainerID{ + Type: m.runtimeName, + ID: containerID, + }, ref) + } + + // Step 3: start the container. + err = m.runtimeService.StartContainer(containerID) + if err != nil { + m.recorder.Eventf(ref, api.EventTypeWarning, events.FailedToStartContainer, + "Failed to start container with id %v with error: %v", containerID, err) + return "Start Container Failed", err + } + m.recorder.Eventf(ref, api.EventTypeNormal, events.StartedContainer, "Started container with id %v", containerID) + + // Step 4: execute the post start hook. + if container.Lifecycle != nil && container.Lifecycle.PostStart != nil { + kubeContainerID := kubecontainer.ContainerID{ + Type: m.runtimeName, + ID: containerID, + } + msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart) + if handlerErr != nil { + err := fmt.Errorf("PostStart handler: %v", handlerErr) + m.generateContainerEvent(kubeContainerID, api.EventTypeWarning, events.FailedPostStartHook, msg) + m.killContainer(pod, kubeContainerID, container, "FailedPostStartHook", nil) + return "PostStart Hook Failed", err + } + } + + return "", nil +} + // getContainerLogsPath gets log path for container. func getContainerLogsPath(containerName string, podUID types.UID) string { return path.Join(podLogsRootDirectory, string(podUID), fmt.Sprintf("%s.log", containerName)) @@ -370,12 +443,11 @@ func (m *kubeGenericRuntimeManager) killContainer(pod *api.Pod, containerID kube if pod != nil && containerSpec != nil && containerSpec.Lifecycle != nil && containerSpec.Lifecycle.PreStop != nil { gracePeriod = gracePeriod - m.executePreStopHook(pod, containerID, containerSpec, gracePeriod) } - if gracePeriodOverride == nil { - // always give containers a minimal shutdown window to avoid unnecessary SIGKILLs - if gracePeriod < minimumGracePeriodInSeconds { - gracePeriod = minimumGracePeriodInSeconds - } - } else { + // always give containers a minimal shutdown window to avoid unnecessary SIGKILLs + if gracePeriod < minimumGracePeriodInSeconds { + gracePeriod = minimumGracePeriodInSeconds + } + if gracePeriodOverride != nil { gracePeriod = *gracePeriodOverride glog.V(3).Infof("Killing container %q, but using %d second grace period override", containerID, gracePeriod) } diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager.go b/pkg/kubelet/kuberuntime/kuberuntime_manager.go index 910d4e3998a..3185e764f22 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager.go @@ -31,6 +31,7 @@ import ( internalApi "k8s.io/kubernetes/pkg/kubelet/api" runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/kubelet/images" "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/network" @@ -39,6 +40,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/util/format" kubetypes "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/flowcontrol" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" ) const ( @@ -306,15 +308,336 @@ func (m *kubeGenericRuntimeManager) GetPods(all bool) ([]*kubecontainer.Pod, err return result, nil } -// SyncPod syncs the running pod into the desired pod. -func (m *kubeGenericRuntimeManager) SyncPod(pod *api.Pod, _ api.PodStatus, - podStatus *kubecontainer.PodStatus, pullSecrets []api.Secret, - backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) { - result.Fail(fmt.Errorf("not implemented")) +// containerToKillInfo contains neccessary information to kill a container. +type containerToKillInfo struct { + // The spec of the container. + container *api.Container + // The name of the container. + name string + // The message indicates why the container will be killed. + message string +} + +// podContainerSpecChanges keeps information on changes that need to happen for a pod. +type podContainerSpecChanges struct { + // Whether need to create a new sandbox. + CreateSandbox bool + // The id of existing sandbox. It is used for starting containers in ContainersToStart. + SandboxID string + // The attempt number of creating sandboxes for the pod. + Attempt uint32 + + // ContainersToStart keeps a map of containers that need to be started, note that + // the key is index of the container inside pod.Spec.Containers, while + // the value is a message indicates why the container needs to start. + ContainersToStart map[int]string + // ContainersToKeep keeps a map of containers that need to be kept as is, note that + // the key is the container ID of the container, while + // the value is index of the container inside pod.Spec.Containers. + ContainersToKeep map[kubecontainer.ContainerID]int + // ContainersToKill keeps a map of containers that need to be killed, note that + // the key is the container ID of the container, while + // the value contains neccessary information to kill a container. + ContainersToKill map[kubecontainer.ContainerID]containerToKillInfo +} + +// podSandboxChanged checks whether the spec of the pod is changed and returns +// (changed, new attempt, original sandboxID if exist). +func (m *kubeGenericRuntimeManager) podSandboxChanged(pod *api.Pod, podStatus *kubecontainer.PodStatus) (changed bool, attempt uint32, sandboxID string) { + if len(podStatus.SandboxStatuses) == 0 { + return true, 0, "" + } + + readySandboxCount := 0 + for _, s := range podStatus.SandboxStatuses { + if s.GetState() == runtimeApi.PodSandBoxState_READY { + readySandboxCount++ + } + } + + // Needs to create a new sandbox when readySandboxCount > 1 or the ready sandbox is not the latest one. + sandboxStatus := podStatus.SandboxStatuses[0] + if readySandboxCount > 1 || sandboxStatus.GetState() != runtimeApi.PodSandBoxState_READY { + return true, sandboxStatus.Metadata.GetAttempt() + 1, sandboxStatus.GetId() + } + + // Needs to create a new sandbox when network namespace changed. + if sandboxStatus.Linux != nil && sandboxStatus.Linux.Namespaces.Options != nil && + sandboxStatus.Linux.Namespaces.Options.GetHostNetwork() != kubecontainer.IsHostNetworkPod(pod) { + return true, sandboxStatus.Metadata.GetAttempt() + 1, "" + } + + return false, sandboxStatus.Metadata.GetAttempt(), sandboxStatus.GetId() +} + +// computePodContainerChanges checks whether the pod spec has changed and returns the changes if true. +func (m *kubeGenericRuntimeManager) computePodContainerChanges(pod *api.Pod, podStatus *kubecontainer.PodStatus) podContainerSpecChanges { + glog.V(5).Infof("Syncing Pod %q: %+v", format.Pod(pod), pod) + + sandboxChanged, attempt, sandboxID := m.podSandboxChanged(pod, podStatus) + changes := podContainerSpecChanges{ + CreateSandbox: sandboxChanged, + SandboxID: sandboxID, + Attempt: attempt, + ContainersToStart: make(map[int]string), + ContainersToKeep: make(map[kubecontainer.ContainerID]int), + ContainersToKill: make(map[kubecontainer.ContainerID]containerToKillInfo), + } + + for index, container := range pod.Spec.Containers { + if sandboxChanged { + message := fmt.Sprintf("Container %+v's pod sandbox is dead, the container will be recreated.", container) + glog.Info(message) + changes.ContainersToStart[index] = message + continue + } + + containerStatus := podStatus.FindContainerStatusByName(container.Name) + if containerStatus == nil || containerStatus.State != kubecontainer.ContainerStateRunning { + if kubecontainer.ShouldContainerBeRestarted(&container, pod, podStatus) { + message := fmt.Sprintf("Container %+v is dead, but RestartPolicy says that we should restart it.", container) + glog.Info(message) + changes.ContainersToStart[index] = message + } + continue + } + + expectedHash := kubecontainer.HashContainer(&container) + containerChanged := containerStatus.Hash != expectedHash + if containerChanged { + message := fmt.Sprintf("Pod %q container %q hash changed (%d vs %d), it will be killed and re-created.", + pod.Name, container.Name, containerStatus.Hash, expectedHash) + glog.Info(message) + changes.ContainersToStart[index] = message + continue + } + + liveness, found := m.livenessManager.Get(containerStatus.ID) + if !found || liveness == proberesults.Success { + changes.ContainersToKeep[containerStatus.ID] = index + continue + } + if pod.Spec.RestartPolicy != api.RestartPolicyNever { + message := fmt.Sprintf("pod %q container %q is unhealthy, it will be killed and re-created.", format.Pod(pod), container.Name) + glog.Info(message) + changes.ContainersToStart[index] = message + } + } + + // compute containers that to be killed + runningContainerStatues := podStatus.GetRunningContainerStatuses() + for _, containerStatus := range runningContainerStatues { + if _, keep := changes.ContainersToKeep[containerStatus.ID]; !keep { + var podContainer *api.Container + var killMessage string + for i, c := range pod.Spec.Containers { + if c.Name == containerStatus.Name { + podContainer = &pod.Spec.Containers[i] + killMessage = changes.ContainersToStart[i] + break + } + } + + changes.ContainersToKill[containerStatus.ID] = containerToKillInfo{ + name: containerStatus.Name, + container: podContainer, + message: killMessage, + } + } + } + + return changes +} + +// SyncPod syncs the running pod into the desired pod by executing following steps: +// +// 1. Compute sandbox and container changes. +// 2. Kill pod sandbox if necessary. +// 3. Kill any containers that should not be running. +// 4. Create sandbox if necessary. +// 5. Create necessary containers +// +// TODO: support init containers in SyncPod. +func (m *kubeGenericRuntimeManager) SyncPod(pod *api.Pod, _ api.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []api.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) { + // Step 1: Compute sandbox and container changes. + podContainerChanges := m.computePodContainerChanges(pod, podStatus) + glog.V(3).Infof("computePodContainerChanges got %+v for pod %q", podContainerChanges, format.Pod(pod)) + if podContainerChanges.CreateSandbox { + ref, err := api.GetReference(pod) + if err != nil { + glog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), err) + } + if podContainerChanges.SandboxID != "" { + m.recorder.Eventf(ref, api.EventTypeNormal, "SandboxChanged", "Pod sandbox changed, it will be killed and re-created.") + } else { + m.recorder.Eventf(ref, api.EventTypeNormal, "SandboxReceived", "Pod sandbox received, it will be created.") + } + + } + + // Step 2: Kill the pod if the sandbox has changed. + if podContainerChanges.CreateSandbox || (len(podContainerChanges.ContainersToKeep) == 0 && len(podContainerChanges.ContainersToStart) == 0) { + if len(podContainerChanges.ContainersToKeep) == 0 && len(podContainerChanges.ContainersToStart) == 0 { + glog.V(4).Infof("Stopping PodSandbox for %q because all other containers are dead.", format.Pod(pod)) + } else { + glog.V(4).Infof("Stopping PodSandbox for %q, will start new one", format.Pod(pod)) + } + + killResult := m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(podStatus), nil) + result.AddPodSyncResult(killResult) + if killResult.Error() != nil { + glog.Errorf("killPodWithSyncResult failed: %v", killResult.Error()) + return + } + } else { + // Step 3: kill any running containers in this pod which are not to keep. + for containerID, containerInfo := range podContainerChanges.ContainersToKill { + glog.V(3).Infof("Killing unwanted container %q(id=%q) for pod %q", containerInfo.name, containerID, format.Pod(pod)) + killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name) + result.AddSyncResult(killContainerResult) + if err := m.killContainer(pod, containerID, containerInfo.container, containerInfo.message, nil); err != nil { + killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error()) + glog.Errorf("killContainer %q(id=%q) for pod %q failed: %v", containerInfo.name, containerID, format.Pod(pod), err) + return + } + } + } + + // We pass the value of the podIP down to generatePodSandboxConfig and + // generateContainerConfig, which in turn passes it to various other + // functions, in order to facilitate functionality that requires this + // value (hosts file and downward API) and avoid races determining + // the pod IP in cases where a container requires restart but the + // podIP isn't in the status manager yet. + // + // We default to the IP in the passed-in pod status, and overwrite it if the + // sandbox needs to be (re)started. + podIP := "" + if podStatus != nil { + podIP = podStatus.IP + } + + // Step 4: Create a sandbox for the pod if necessary. + podSandboxID := podContainerChanges.SandboxID + if podContainerChanges.CreateSandbox && len(podContainerChanges.ContainersToStart) > 0 { + var msg string + var err error + + glog.V(4).Infof("Creating sandbox for pod %q", format.Pod(pod)) + createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod)) + result.AddSyncResult(createSandboxResult) + podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt) + if err != nil { + createSandboxResult.Fail(kubecontainer.ErrCreatePodSandbox, msg) + glog.Errorf("createPodSandbox for pod %q failed: %v", format.Pod(pod), err) + return + } + + setupNetworkResult := kubecontainer.NewSyncResult(kubecontainer.SetupNetwork, podSandboxID) + result.AddSyncResult(setupNetworkResult) + if !kubecontainer.IsHostNetworkPod(pod) { + glog.V(3).Infof("Calling network plugin %s to setup pod for %s", m.networkPlugin.Name(), format.Pod(pod)) + // Setup pod network plugin with sandbox id + // TODO: rename the last param to sandboxID + err = m.networkPlugin.SetUpPod(pod.Namespace, pod.Name, kubecontainer.ContainerID{ + Type: m.runtimeName, + ID: podSandboxID, + }) + if err != nil { + message := fmt.Sprintf("Failed to setup network for pod %q using network plugins %q: %v", format.Pod(pod), m.networkPlugin.Name(), err) + setupNetworkResult.Fail(kubecontainer.ErrSetupNetwork, message) + glog.Error(message) + + killPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.KillPodSandbox, format.Pod(pod)) + result.AddSyncResult(killPodSandboxResult) + if err := m.runtimeService.StopPodSandbox(podSandboxID); err != nil { + killPodSandboxResult.Fail(kubecontainer.ErrKillPodSandbox, err.Error()) + glog.Errorf("Kill sandbox %q failed for pod %q: %v", podSandboxID, format.Pod(pod), err) + } + return + } + + podSandboxStatus, err := m.runtimeService.PodSandboxStatus(podSandboxID) + if err != nil { + glog.Errorf("Failed to get pod sandbox status: %v; Skipping pod %q", err, format.Pod(pod)) + result.Fail(err) + return + } + + // Overwrite the podIP passed in the pod status, since we just started the infra container. + podIP = m.determinePodSandboxIP(pod.Namespace, pod.Name, podSandboxStatus) + glog.V(4).Infof("Determined the ip %q for pod %q after sandbox changed", podIP, format.Pod(pod)) + } + } + + // Get podSandboxConfig for containers to start. + configPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID) + result.AddSyncResult(configPodSandboxResult) + podSandboxConfig, err := m.generatePodSandboxConfig(pod, podContainerChanges.Attempt) + if err != nil { + message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err) + glog.Error(message) + configPodSandboxResult.Fail(kubecontainer.ErrConfigPodSandbox, message) + return + } + + // Step 5: start containers in podContainerChanges.ContainersToStart. + for idx := range podContainerChanges.ContainersToStart { + container := &pod.Spec.Containers[idx] + startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name) + result.AddSyncResult(startContainerResult) + + isInBackOff, msg, err := m.doBackOff(pod, container, podStatus, backOff) + if isInBackOff { + startContainerResult.Fail(err, msg) + glog.V(4).Infof("Backing Off restarting container %+v in pod %v", container, format.Pod(pod)) + continue + } + + glog.V(4).Infof("Creating container %+v in pod %v", container, format.Pod(pod)) + if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP); err != nil { + startContainerResult.Fail(err, msg) + utilruntime.HandleError(fmt.Errorf("container start failed: %v: %s", err, msg)) + continue + } + } return } +// If a container is still in backoff, the function will return a brief backoff error and +// a detailed error message. +func (m *kubeGenericRuntimeManager) doBackOff(pod *api.Pod, container *api.Container, podStatus *kubecontainer.PodStatus, backOff *flowcontrol.Backoff) (bool, string, error) { + var cStatus *kubecontainer.ContainerStatus + for _, c := range podStatus.ContainerStatuses { + if c.Name == container.Name && c.State == kubecontainer.ContainerStateExited { + cStatus = c + break + } + } + + if cStatus == nil { + return false, "", nil + } + + glog.Infof("checking backoff for container %q in pod %q", container.Name, format.Pod(pod)) + // Use the finished time of the latest exited container as the start point to calculate whether to do back-off. + ts := cStatus.FinishedAt + // backOff requires a unique id to identify the container + stableName := fmt.Sprintf("%s_%s_%s_%s", pod.Name, pod.Namespace, string(pod.UID), container.Name) + if backOff.IsInBackOffSince(stableName, ts) { + if ref, err := kubecontainer.GenerateContainerRef(pod, container); err == nil { + m.recorder.Eventf(ref, api.EventTypeWarning, events.BackOffStartContainer, "Back-off restarting failed container") + } + err := fmt.Errorf("Back-off %s restarting failed container=%s pod=%s", backOff.Get(stableName), container.Name, format.Pod(pod)) + glog.Infof("%s", err.Error()) + return true, err.Error(), kubecontainer.ErrCrashLoopBackOff + } + + backOff.Next(stableName, ts) + return false, "", nil +} + // KillPod kills all the containers of a pod. Pod may be nil, running pod must not be. // gracePeriodOverride if specified allows the caller to override the pod default grace period. // only hard kill paths are allowed to specify a gracePeriodOverride in the kubelet in order to not corrupt user data. @@ -409,11 +732,18 @@ func (m *kubeGenericRuntimeManager) GetPodStatus(uid kubetypes.UID, name, namesp // Anyhow, we only promised "best-effort" restart count reporting, we can just ignore // these limitations now. // TODO: move this comment to SyncPod. - podFullName := kubecontainer.BuildPodFullName(name, namespace) podSandboxIDs, err := m.getSandboxIDByPodUID(string(uid), nil) if err != nil { return nil, err } + + podFullName := format.Pod(&api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: name, + Namespace: namespace, + UID: uid, + }, + }) glog.V(4).Infof("getSandboxIDByPodUID got sandbox IDs %q for pod %q(UID:%q)", podSandboxIDs, podFullName, string(uid)) sandboxStatuses := make([]*runtimeApi.PodSandboxStatus, len(podSandboxIDs)) @@ -432,12 +762,12 @@ func (m *kubeGenericRuntimeManager) GetPodStatus(uid kubetypes.UID, name, namesp podIP = m.determinePodSandboxIP(namespace, name, podSandboxStatus) } - containerStatus, err := m.getKubeletContainerStatuses(podSandboxID) + statuses, err := m.getKubeletContainerStatuses(podSandboxID) if err != nil { glog.Errorf("getKubeletContainerStatuses for sandbox %s failed: %v", podSandboxID, err) return nil, err } - containerStatuses = append(containerStatuses, containerStatus...) + containerStatuses = append(containerStatuses, statuses...) } return &kubecontainer.PodStatus{ @@ -492,7 +822,14 @@ func (m *kubeGenericRuntimeManager) GarbageCollect(gcPolicy kubecontainer.Contai // GetPodContainerID gets pod sandbox ID func (m *kubeGenericRuntimeManager) GetPodContainerID(pod *kubecontainer.Pod) (kubecontainer.ContainerID, error) { - podFullName := kubecontainer.BuildPodFullName(pod.Name, pod.Namespace) + // TODO: add a format function for kubecontainer.Pod + podFullName := format.Pod(&api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: pod.Name, + Namespace: pod.Namespace, + UID: pod.ID, + }, + }) if len(pod.Sandboxes) == 0 { glog.Errorf("No sandboxes are found for pod %q", podFullName) return kubecontainer.ContainerID{}, fmt.Errorf("sandboxes for pod %q not found", podFullName) diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go b/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go index bba07b073d7..72be2d06cf3 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go @@ -20,6 +20,7 @@ import ( "reflect" "sort" "testing" + "time" "github.com/stretchr/testify/assert" "k8s.io/kubernetes/pkg/api" @@ -31,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/network" nettest "k8s.io/kubernetes/pkg/kubelet/network/testing" kubetypes "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util/flowcontrol" ) var ( @@ -70,7 +72,7 @@ func makeAndSetFakePod(m *kubeGenericRuntimeManager, fakeRuntime *apitest.FakeRu } func makeFakePodSandbox(m *kubeGenericRuntimeManager, pod *api.Pod, createdAt int64) (*apitest.FakePodSandbox, error) { - config, err := m.generatePodSandboxConfig(pod, "", 0) + config, err := m.generatePodSandboxConfig(pod, 0) if err != nil { return nil, err } @@ -114,7 +116,7 @@ func makeFakeContainer(m *kubeGenericRuntimeManager, pod *api.Pod, container api } func makeFakeContainers(m *kubeGenericRuntimeManager, pod *api.Pod, containers []api.Container, createdAt int64) ([]*apitest.FakeContainer, error) { - sandboxConfig, err := m.generatePodSandboxConfig(pod, "", 0) + sandboxConfig, err := m.generatePodSandboxConfig(pod, 0) if err != nil { return nil, err } @@ -441,3 +443,44 @@ func TestKillPod(t *testing.T) { assert.Equal(t, runtimeApi.ContainerState_EXITED, c.GetState()) } } + +func TestSyncPod(t *testing.T) { + fakeRuntime, fakeImage, m, err := createTestRuntimeManager() + assert.NoError(t, err) + + containers := []api.Container{ + { + Name: "foo1", + Image: "busybox", + ImagePullPolicy: api.PullIfNotPresent, + }, + { + Name: "foo2", + Image: "alpine", + ImagePullPolicy: api.PullIfNotPresent, + }, + } + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + UID: "12345678", + Name: "foo", + Namespace: "new", + }, + Spec: api.PodSpec{ + Containers: containers, + }, + } + + backOff := flowcontrol.NewBackOff(time.Second, time.Minute) + result := m.SyncPod(pod, api.PodStatus{}, &kubecontainer.PodStatus{}, []api.Secret{}, backOff) + assert.NoError(t, result.Error()) + assert.Equal(t, 2, len(fakeRuntime.Containers)) + assert.Equal(t, 2, len(fakeImage.Images)) + assert.Equal(t, 1, len(fakeRuntime.Sandboxes)) + for _, sandbox := range fakeRuntime.Sandboxes { + assert.Equal(t, runtimeApi.PodSandBoxState_READY, sandbox.GetState()) + } + for _, c := range fakeRuntime.Containers { + assert.Equal(t, runtimeApi.ContainerState_RUNNING, c.GetState()) + } +} diff --git a/pkg/kubelet/kuberuntime/kuberuntime_sandbox.go b/pkg/kubelet/kuberuntime/kuberuntime_sandbox.go index d5674b7ca25..26489f36c7a 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_sandbox.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_sandbox.go @@ -17,6 +17,7 @@ limitations under the License. package kuberuntime import ( + "fmt" "sort" "github.com/golang/glog" @@ -25,10 +26,30 @@ import ( kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/types" + "k8s.io/kubernetes/pkg/kubelet/util/format" ) +// createPodSandbox creates a pod sandbox and returns (podSandBoxID, message, error). +func (m *kubeGenericRuntimeManager) createPodSandbox(pod *api.Pod, attempt uint32) (string, string, error) { + podSandboxConfig, err := m.generatePodSandboxConfig(pod, attempt) + if err != nil { + message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err) + glog.Error(message) + return "", message, err + } + + podSandBoxID, err := m.runtimeService.RunPodSandbox(podSandboxConfig) + if err != nil { + message := fmt.Sprintf("CreatePodSandbox for pod %q failed: %v", format.Pod(pod), err) + glog.Error(message) + return "", message, err + } + + return podSandBoxID, "", nil +} + // generatePodSandboxConfig generates pod sandbox config from api.Pod. -func (m *kubeGenericRuntimeManager) generatePodSandboxConfig(pod *api.Pod, podIP string, attempt uint32) (*runtimeApi.PodSandboxConfig, error) { +func (m *kubeGenericRuntimeManager) generatePodSandboxConfig(pod *api.Pod, attempt uint32) (*runtimeApi.PodSandboxConfig, error) { // TODO: deprecating podsandbox resource requirements in favor of the pod level cgroup // Refer https://github.com/kubernetes/kubernetes/issues/29871 podUID := string(pod.UID) @@ -63,7 +84,8 @@ func (m *kubeGenericRuntimeManager) generatePodSandboxConfig(pod *api.Pod, podIP cgroupParent := "" portMappings := []*runtimeApi.PortMapping{} for _, c := range pod.Spec.Containers { - opts, err := m.runtimeHelper.GenerateRunContainerOptions(pod, &c, podIP) + // TODO: use a separate interface to only generate portmappings + opts, err := m.runtimeHelper.GenerateRunContainerOptions(pod, &c, "") if err != nil { return nil, err }