mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 14:37:00 +00:00
Merge pull request #7610 from vmarmol/runtime-syncpod
Add SyncPod() to DockerManager and use it in Kubelet
This commit is contained in:
commit
ee473edd45
@ -553,7 +553,7 @@ func TestFindContainersByPod(t *testing.T) {
|
|||||||
}
|
}
|
||||||
fakeClient := &FakeDockerClient{}
|
fakeClient := &FakeDockerClient{}
|
||||||
np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
|
np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
|
||||||
containerManager := NewDockerManager(fakeClient, &record.FakeRecorder{}, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, &kubeletProber.FakeProber{})
|
containerManager := NewDockerManager(fakeClient, &record.FakeRecorder{}, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, &kubeletProber.FakeProber{}, nil, nil, nil)
|
||||||
for i, test := range tests {
|
for i, test := range tests {
|
||||||
fakeClient.ContainerList = test.containerList
|
fakeClient.ContainerList = test.containerList
|
||||||
fakeClient.ExitedContainerList = test.exitedContainerList
|
fakeClient.ExitedContainerList = test.exitedContainerList
|
||||||
|
@ -34,6 +34,7 @@ import (
|
|||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
|
||||||
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
|
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/lifecycle"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/prober"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/prober"
|
||||||
kubeletTypes "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/types"
|
kubeletTypes "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/types"
|
||||||
@ -93,6 +94,15 @@ type DockerManager struct {
|
|||||||
// with prober.
|
// with prober.
|
||||||
// Health check prober.
|
// Health check prober.
|
||||||
Prober prober.Prober
|
Prober prober.Prober
|
||||||
|
|
||||||
|
// Generator of runtime container options.
|
||||||
|
generator kubecontainer.RunContainerOptionsGenerator
|
||||||
|
|
||||||
|
// Runner of lifecycle events.
|
||||||
|
runner kubecontainer.HandlerRunner
|
||||||
|
|
||||||
|
// Hooks injected into the container runtime.
|
||||||
|
runtimeHooks kubecontainer.RuntimeHooks
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewDockerManager(
|
func NewDockerManager(
|
||||||
@ -106,7 +116,10 @@ func NewDockerManager(
|
|||||||
containerLogsDir string,
|
containerLogsDir string,
|
||||||
osInterface kubecontainer.OSInterface,
|
osInterface kubecontainer.OSInterface,
|
||||||
networkPlugin network.NetworkPlugin,
|
networkPlugin network.NetworkPlugin,
|
||||||
prober prober.Prober) *DockerManager {
|
prober prober.Prober,
|
||||||
|
generator kubecontainer.RunContainerOptionsGenerator,
|
||||||
|
httpClient kubeletTypes.HttpGetter,
|
||||||
|
runtimeHooks kubecontainer.RuntimeHooks) *DockerManager {
|
||||||
// Work out the location of the Docker runtime, defaulting to /var/lib/docker
|
// Work out the location of the Docker runtime, defaulting to /var/lib/docker
|
||||||
// if there are any problems.
|
// if there are any problems.
|
||||||
dockerRoot := "/var/lib/docker"
|
dockerRoot := "/var/lib/docker"
|
||||||
@ -138,7 +151,7 @@ func NewDockerManager(
|
|||||||
}
|
}
|
||||||
|
|
||||||
reasonCache := stringCache{cache: lru.New(maxReasonCacheEntries)}
|
reasonCache := stringCache{cache: lru.New(maxReasonCacheEntries)}
|
||||||
return &DockerManager{
|
dm := &DockerManager{
|
||||||
client: client,
|
client: client,
|
||||||
recorder: recorder,
|
recorder: recorder,
|
||||||
readinessManager: readinessManager,
|
readinessManager: readinessManager,
|
||||||
@ -151,7 +164,11 @@ func NewDockerManager(
|
|||||||
containerLogsDir: containerLogsDir,
|
containerLogsDir: containerLogsDir,
|
||||||
networkPlugin: networkPlugin,
|
networkPlugin: networkPlugin,
|
||||||
Prober: prober,
|
Prober: prober,
|
||||||
|
generator: generator,
|
||||||
|
runtimeHooks: runtimeHooks,
|
||||||
}
|
}
|
||||||
|
dm.runner = lifecycle.NewHandlerRunner(httpClient, dm, dm)
|
||||||
|
return dm
|
||||||
}
|
}
|
||||||
|
|
||||||
// A cache which stores strings keyed by <pod_UID>_<container_name>.
|
// A cache which stores strings keyed by <pod_UID>_<container_name>.
|
||||||
@ -737,6 +754,7 @@ func (dm *DockerManager) ListImages() ([]kubecontainer.Image, error) {
|
|||||||
return images, nil
|
return images, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(vmarmol): Consider unexporting.
|
||||||
// PullImage pulls an image from network to local storage.
|
// PullImage pulls an image from network to local storage.
|
||||||
func (dm *DockerManager) PullImage(image string) error {
|
func (dm *DockerManager) PullImage(image string) error {
|
||||||
return dm.Puller.Pull(image)
|
return dm.Puller.Pull(image)
|
||||||
@ -1061,6 +1079,7 @@ func (dm *DockerManager) KillContainerInPod(container api.Container, pod *api.Po
|
|||||||
return dm.killContainer(targetContainer.ID)
|
return dm.killContainer(targetContainer.ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(vmarmol): Unexport this as it is no longer used externally.
|
||||||
// KillContainer kills a container identified by containerID.
|
// KillContainer kills a container identified by containerID.
|
||||||
// Internally, it invokes docker's StopContainer API with a timeout of 10s.
|
// Internally, it invokes docker's StopContainer API with a timeout of 10s.
|
||||||
// TODO: Deprecate this function in favor of KillContainerInPod.
|
// TODO: Deprecate this function in favor of KillContainerInPod.
|
||||||
@ -1084,14 +1103,15 @@ func (dm *DockerManager) killContainer(containerID types.UID) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(vmarmol): Unexport this as it is no longer used externally.
|
||||||
// Run a single container from a pod. Returns the docker container ID
|
// Run a single container from a pod. Returns the docker container ID
|
||||||
func (dm *DockerManager) RunContainer(pod *api.Pod, container *api.Container, generator kubecontainer.RunContainerOptionsGenerator, runner kubecontainer.HandlerRunner, netMode, ipcMode string) (kubeletTypes.DockerID, error) {
|
func (dm *DockerManager) RunContainer(pod *api.Pod, container *api.Container, netMode, ipcMode string) (kubeletTypes.DockerID, error) {
|
||||||
ref, err := kubecontainer.GenerateContainerRef(pod, container)
|
ref, err := kubecontainer.GenerateContainerRef(pod, container)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
|
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
opts, err := generator.GenerateRunContainerOptions(pod, container, netMode, ipcMode)
|
opts, err := dm.generator.GenerateRunContainerOptions(pod, container, netMode, ipcMode)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
@ -1107,7 +1127,7 @@ func (dm *DockerManager) RunContainer(pod *api.Pod, container *api.Container, ge
|
|||||||
}
|
}
|
||||||
|
|
||||||
if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
|
if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
|
||||||
handlerErr := runner.Run(id, pod, container, container.Lifecycle.PostStart)
|
handlerErr := dm.runner.Run(id, pod, container, container.Lifecycle.PostStart)
|
||||||
if handlerErr != nil {
|
if handlerErr != nil {
|
||||||
dm.killContainer(types.UID(id))
|
dm.killContainer(types.UID(id))
|
||||||
return kubeletTypes.DockerID(""), fmt.Errorf("failed to call event handler: %v", handlerErr)
|
return kubeletTypes.DockerID(""), fmt.Errorf("failed to call event handler: %v", handlerErr)
|
||||||
@ -1127,8 +1147,8 @@ func (dm *DockerManager) RunContainer(pod *api.Pod, container *api.Container, ge
|
|||||||
return kubeletTypes.DockerID(id), err
|
return kubeletTypes.DockerID(id), err
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreatePodInfraContainer starts the pod infra container for a pod. Returns the docker container ID of the newly created container.
|
// createPodInfraContainer starts the pod infra container for a pod. Returns the docker container ID of the newly created container.
|
||||||
func (dm *DockerManager) CreatePodInfraContainer(pod *api.Pod, generator kubecontainer.RunContainerOptionsGenerator, runner kubecontainer.HandlerRunner) (kubeletTypes.DockerID, error) {
|
func (dm *DockerManager) createPodInfraContainer(pod *api.Pod) (kubeletTypes.DockerID, error) {
|
||||||
// Use host networking if specified.
|
// Use host networking if specified.
|
||||||
netNamespace := ""
|
netNamespace := ""
|
||||||
var ports []api.ContainerPort
|
var ports []api.ContainerPort
|
||||||
@ -1172,7 +1192,7 @@ func (dm *DockerManager) CreatePodInfraContainer(pod *api.Pod, generator kubecon
|
|||||||
dm.recorder.Eventf(ref, "pulled", "Successfully pulled image %q", container.Image)
|
dm.recorder.Eventf(ref, "pulled", "Successfully pulled image %q", container.Image)
|
||||||
}
|
}
|
||||||
|
|
||||||
id, err := dm.RunContainer(pod, container, generator, runner, netNamespace, "")
|
id, err := dm.RunContainer(pod, container, netNamespace, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
@ -1211,8 +1231,7 @@ type PodContainerChangesSpec struct {
|
|||||||
ContainersToKeep map[kubeletTypes.DockerID]int
|
ContainersToKeep map[kubeletTypes.DockerID]int
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(vmarmol): This will soon be made non-public when its only use is internal.
|
func (dm *DockerManager) computePodContainerChanges(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus) (PodContainerChangesSpec, error) {
|
||||||
func (dm *DockerManager) ComputePodContainerChanges(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus) (PodContainerChangesSpec, error) {
|
|
||||||
podFullName := kubecontainer.GetPodFullName(pod)
|
podFullName := kubecontainer.GetPodFullName(pod)
|
||||||
uid := pod.UID
|
uid := pod.UID
|
||||||
glog.V(4).Infof("Syncing Pod %+v, podFullName: %q, uid: %q", pod, podFullName, uid)
|
glog.V(4).Infof("Syncing Pod %+v, podFullName: %q, uid: %q", pod, podFullName, uid)
|
||||||
@ -1318,3 +1337,97 @@ func (dm *DockerManager) ComputePodContainerChanges(pod *api.Pod, runningPod kub
|
|||||||
ContainersToKeep: containersToKeep,
|
ContainersToKeep: containersToKeep,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Pull the image for the specified pod and container.
|
||||||
|
func (dm *DockerManager) pullImage(pod *api.Pod, container *api.Container) error {
|
||||||
|
present, err := dm.IsImagePresent(container.Image)
|
||||||
|
if err != nil {
|
||||||
|
ref, err := kubecontainer.GenerateContainerRef(pod, container)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
|
||||||
|
}
|
||||||
|
if ref != nil {
|
||||||
|
dm.recorder.Eventf(ref, "failed", "Failed to inspect image %q: %v", container.Image, err)
|
||||||
|
}
|
||||||
|
return fmt.Errorf("failed to inspect image %q: %v", container.Image, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !dm.runtimeHooks.ShouldPullImage(pod, container, present) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
err = dm.PullImage(container.Image)
|
||||||
|
dm.runtimeHooks.ReportImagePull(pod, container, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sync the running pod to match the specified desired pod.
|
||||||
|
func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus) error {
|
||||||
|
podFullName := kubecontainer.GetPodFullName(pod)
|
||||||
|
containerChanges, err := dm.computePodContainerChanges(pod, runningPod, podStatus)
|
||||||
|
glog.V(3).Infof("Got container changes for pod %q: %+v", podFullName, containerChanges)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if containerChanges.StartInfraContainer || (len(containerChanges.ContainersToKeep) == 0 && len(containerChanges.ContainersToStart) == 0) {
|
||||||
|
if len(containerChanges.ContainersToKeep) == 0 && len(containerChanges.ContainersToStart) == 0 {
|
||||||
|
glog.V(4).Infof("Killing Infra Container for %q because all other containers are dead.", podFullName)
|
||||||
|
} else {
|
||||||
|
glog.V(4).Infof("Killing Infra Container for %q, will start new one", podFullName)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Killing phase: if we want to start new infra container, or nothing is running kill everything (including infra container)
|
||||||
|
err = dm.KillPod(runningPod)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Otherwise kill any containers in this pod which are not specified as ones to keep.
|
||||||
|
for _, container := range runningPod.Containers {
|
||||||
|
_, keep := containerChanges.ContainersToKeep[kubeletTypes.DockerID(container.ID)]
|
||||||
|
if !keep {
|
||||||
|
glog.V(3).Infof("Killing unwanted container %+v", container)
|
||||||
|
err = dm.KillContainer(container.ID)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Error killing container: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we should create infra container then we do it first.
|
||||||
|
podInfraContainerID := containerChanges.InfraContainerId
|
||||||
|
if containerChanges.StartInfraContainer && (len(containerChanges.ContainersToStart) > 0) {
|
||||||
|
glog.V(4).Infof("Creating pod infra container for %q", podFullName)
|
||||||
|
podInfraContainerID, err = dm.createPodInfraContainer(pod)
|
||||||
|
|
||||||
|
// Call the networking plugin
|
||||||
|
if err == nil {
|
||||||
|
err = dm.networkPlugin.SetUpPod(pod.Namespace, pod.Name, podInfraContainerID)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Failed to create pod infra container: %v; Skipping pod %q", err, podFullName)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start everything
|
||||||
|
for container := range containerChanges.ContainersToStart {
|
||||||
|
glog.V(4).Infof("Creating container %+v", pod.Spec.Containers[container])
|
||||||
|
containerSpec := &pod.Spec.Containers[container]
|
||||||
|
if err := dm.pullImage(pod, containerSpec); err != nil {
|
||||||
|
glog.Warningf("Failed to pull image %q from pod %q and container %q: %v", containerSpec.Image, kubecontainer.GetPodFullName(pod), containerSpec.Name, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// TODO(dawnchen): Check RestartPolicy.DelaySeconds before restart a container
|
||||||
|
namespaceMode := fmt.Sprintf("container:%v", podInfraContainerID)
|
||||||
|
_, err := dm.RunContainer(pod, containerSpec, namespaceMode, namespaceMode)
|
||||||
|
if err != nil {
|
||||||
|
// TODO(bburns) : Perhaps blacklist a container after N failures?
|
||||||
|
glog.Errorf("Error running pod %q container %q: %v", kubecontainer.GetPodFullName(pod), containerSpec.Name, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -47,6 +47,9 @@ func NewFakeDockerManager() (*DockerManager, *FakeDockerClient) {
|
|||||||
0, 0, "",
|
0, 0, "",
|
||||||
kubecontainer.FakeOS{},
|
kubecontainer.FakeOS{},
|
||||||
networkPlugin,
|
networkPlugin,
|
||||||
|
nil,
|
||||||
|
nil,
|
||||||
|
nil,
|
||||||
nil)
|
nil)
|
||||||
|
|
||||||
return dockerManager, fakeDocker
|
return dockerManager, fakeDocker
|
||||||
|
@ -41,7 +41,6 @@ import (
|
|||||||
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
|
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/envvars"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/envvars"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/lifecycle"
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/prober"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/prober"
|
||||||
@ -233,7 +232,6 @@ func NewMainKubelet(
|
|||||||
resourceContainer: resourceContainer,
|
resourceContainer: resourceContainer,
|
||||||
os: osInterface,
|
os: osInterface,
|
||||||
oomWatcher: oomWatcher,
|
oomWatcher: oomWatcher,
|
||||||
runtimeHooks: newKubeletRuntimeHooks(recorder),
|
|
||||||
cgroupRoot: cgroupRoot,
|
cgroupRoot: cgroupRoot,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -253,13 +251,15 @@ func NewMainKubelet(
|
|||||||
containerLogsDir,
|
containerLogsDir,
|
||||||
osInterface,
|
osInterface,
|
||||||
klet.networkPlugin,
|
klet.networkPlugin,
|
||||||
nil)
|
nil,
|
||||||
|
klet,
|
||||||
|
klet.httpClient,
|
||||||
|
newKubeletRuntimeHooks(recorder))
|
||||||
klet.runner = containerManager
|
klet.runner = containerManager
|
||||||
klet.containerManager = containerManager
|
klet.containerManager = containerManager
|
||||||
|
|
||||||
klet.podManager = newBasicPodManager(klet.kubeClient)
|
klet.podManager = newBasicPodManager(klet.kubeClient)
|
||||||
klet.prober = prober.New(klet.runner, klet.readinessManager, klet.containerRefManager, klet.recorder)
|
klet.prober = prober.New(klet.runner, klet.readinessManager, klet.containerRefManager, klet.recorder)
|
||||||
klet.handlerRunner = lifecycle.NewHandlerRunner(klet.httpClient, klet.runner, klet.containerManager)
|
|
||||||
|
|
||||||
// TODO(vmarmol): Remove when the circular dependency is removed :(
|
// TODO(vmarmol): Remove when the circular dependency is removed :(
|
||||||
containerManager.Prober = klet.prober
|
containerManager.Prober = klet.prober
|
||||||
@ -345,9 +345,6 @@ type Kubelet struct {
|
|||||||
// Healthy check prober.
|
// Healthy check prober.
|
||||||
prober prober.Prober
|
prober prober.Prober
|
||||||
|
|
||||||
// Container lifecycle handler runner.
|
|
||||||
handlerRunner kubecontainer.HandlerRunner
|
|
||||||
|
|
||||||
// Container readiness state manager.
|
// Container readiness state manager.
|
||||||
readinessManager *kubecontainer.ReadinessManager
|
readinessManager *kubecontainer.ReadinessManager
|
||||||
|
|
||||||
@ -403,10 +400,6 @@ type Kubelet struct {
|
|||||||
// Watcher of out of memory events.
|
// Watcher of out of memory events.
|
||||||
oomWatcher OOMWatcher
|
oomWatcher OOMWatcher
|
||||||
|
|
||||||
// TODO(vmarmol): Remove this when we only have to inject the hooks into the runtimes.
|
|
||||||
// Hooks injected into the container runtime.
|
|
||||||
runtimeHooks kubecontainer.RuntimeHooks
|
|
||||||
|
|
||||||
// If non-empty, pass this to the container runtime as the root cgroup.
|
// If non-empty, pass this to the container runtime as the root cgroup.
|
||||||
cgroupRoot string
|
cgroupRoot string
|
||||||
}
|
}
|
||||||
@ -871,29 +864,6 @@ func parseResolvConf(reader io.Reader) (nameservers []string, searches []string,
|
|||||||
return nameservers, searches, nil
|
return nameservers, searches, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pull the image for the specified pod and container.
|
|
||||||
func (kl *Kubelet) pullImage(pod *api.Pod, container *api.Container) error {
|
|
||||||
present, err := kl.containerManager.IsImagePresent(container.Image)
|
|
||||||
if err != nil {
|
|
||||||
ref, err := kubecontainer.GenerateContainerRef(pod, container)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
|
|
||||||
}
|
|
||||||
if ref != nil {
|
|
||||||
kl.recorder.Eventf(ref, "failed", "Failed to inspect image %q: %v", container.Image, err)
|
|
||||||
}
|
|
||||||
return fmt.Errorf("failed to inspect image %q: %v", container.Image, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !kl.runtimeHooks.ShouldPullImage(pod, container, present) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
err = kl.containerManager.PullImage(container.Image)
|
|
||||||
kl.runtimeHooks.ReportImagePull(pod, container, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Kill all running containers in a pod (includes the pod infra container).
|
// Kill all running containers in a pod (includes the pod infra container).
|
||||||
func (kl *Kubelet) killPod(pod kubecontainer.Pod) error {
|
func (kl *Kubelet) killPod(pod kubecontainer.Pod) error {
|
||||||
return kl.containerManager.KillPod(pod)
|
return kl.containerManager.KillPod(pod)
|
||||||
@ -951,44 +921,6 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
podStatus, err := kl.generatePodStatus(pod)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Unable to get status for pod %q (uid %q): %v", podFullName, uid, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
containerChanges, err := kl.containerManager.ComputePodContainerChanges(pod, runningPod, podStatus)
|
|
||||||
glog.V(3).Infof("Got container changes for pod %q: %+v", podFullName, containerChanges)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if containerChanges.StartInfraContainer || (len(containerChanges.ContainersToKeep) == 0 && len(containerChanges.ContainersToStart) == 0) {
|
|
||||||
if len(containerChanges.ContainersToKeep) == 0 && len(containerChanges.ContainersToStart) == 0 {
|
|
||||||
glog.V(4).Infof("Killing Infra Container for %q because all other containers are dead.", podFullName)
|
|
||||||
} else {
|
|
||||||
glog.V(4).Infof("Killing Infra Container for %q, will start new one", podFullName)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Killing phase: if we want to start new infra container, or nothing is running kill everything (including infra container)
|
|
||||||
err = kl.killPod(runningPod)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Otherwise kill any containers in this pod which are not specified as ones to keep.
|
|
||||||
for _, container := range runningPod.Containers {
|
|
||||||
_, keep := containerChanges.ContainersToKeep[kubeletTypes.DockerID(container.ID)]
|
|
||||||
if !keep {
|
|
||||||
glog.V(3).Infof("Killing unwanted container %+v", container)
|
|
||||||
err = kl.containerManager.KillContainer(container.ID)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Error killing container: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Starting phase:
|
// Starting phase:
|
||||||
ref, err := api.GetReference(pod)
|
ref, err := api.GetReference(pod)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1006,37 +938,15 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
|
|||||||
}
|
}
|
||||||
kl.volumeManager.SetVolumes(pod.UID, podVolumes)
|
kl.volumeManager.SetVolumes(pod.UID, podVolumes)
|
||||||
|
|
||||||
// If we should create infra container then we do it first.
|
podStatus, err := kl.generatePodStatus(pod)
|
||||||
podInfraContainerID := containerChanges.InfraContainerId
|
|
||||||
if containerChanges.StartInfraContainer && (len(containerChanges.ContainersToStart) > 0) {
|
|
||||||
glog.V(4).Infof("Creating pod infra container for %q", podFullName)
|
|
||||||
podInfraContainerID, err = kl.containerManager.CreatePodInfraContainer(pod, kl, kl.handlerRunner)
|
|
||||||
|
|
||||||
// Call the networking plugin
|
|
||||||
if err == nil {
|
|
||||||
err = kl.networkPlugin.SetUpPod(pod.Namespace, pod.Name, podInfraContainerID)
|
|
||||||
}
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Failed to create pod infra container: %v; Skipping pod %q", err, podFullName)
|
glog.Errorf("Unable to get status for pod %q (uid %q): %v", podFullName, uid, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// Start everything
|
err = kl.containerManager.SyncPod(pod, runningPod, podStatus)
|
||||||
for container := range containerChanges.ContainersToStart {
|
|
||||||
glog.V(4).Infof("Creating container %+v", pod.Spec.Containers[container])
|
|
||||||
containerSpec := &pod.Spec.Containers[container]
|
|
||||||
if err := kl.pullImage(pod, containerSpec); err != nil {
|
|
||||||
glog.Warningf("Failed to pull image %q from pod %q and container %q: %v", containerSpec.Image, kubecontainer.GetPodFullName(pod), containerSpec.Name, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// TODO(dawnchen): Check RestartPolicy.DelaySeconds before restart a container
|
|
||||||
namespaceMode := fmt.Sprintf("container:%v", podInfraContainerID)
|
|
||||||
_, err := kl.containerManager.RunContainer(pod, containerSpec, kl, kl.handlerRunner, namespaceMode, namespaceMode)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO(bburns) : Perhaps blacklist a container after N failures?
|
return err
|
||||||
glog.Errorf("Error running pod %q container %q: %v", kubecontainer.GetPodFullName(pod), containerSpec.Name, err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if isStaticPod(pod) {
|
if isStaticPod(pod) {
|
||||||
|
@ -43,7 +43,6 @@ import (
|
|||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
|
||||||
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
|
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/lifecycle"
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/prober"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/prober"
|
||||||
@ -109,7 +108,8 @@ func newTestKubelet(t *testing.T) *TestKubelet {
|
|||||||
podManager, fakeMirrorClient := newFakePodManager()
|
podManager, fakeMirrorClient := newFakePodManager()
|
||||||
kubelet.podManager = podManager
|
kubelet.podManager = podManager
|
||||||
kubelet.containerRefManager = kubecontainer.NewRefManager()
|
kubelet.containerRefManager = kubecontainer.NewRefManager()
|
||||||
kubelet.containerManager = dockertools.NewDockerManager(fakeDocker, fakeRecorder, kubelet.readinessManager, kubelet.containerRefManager, dockertools.PodInfraContainerImage, 0, 0, "", kubelet.os, kubelet.networkPlugin, nil)
|
runtimeHooks := newKubeletRuntimeHooks(kubelet.recorder)
|
||||||
|
kubelet.containerManager = dockertools.NewDockerManager(fakeDocker, fakeRecorder, kubelet.readinessManager, kubelet.containerRefManager, dockertools.PodInfraContainerImage, 0, 0, "", kubelet.os, kubelet.networkPlugin, nil, kubelet, &fakeHTTP{}, runtimeHooks)
|
||||||
kubelet.runtimeCache = kubecontainer.NewFakeRuntimeCache(kubelet.containerManager)
|
kubelet.runtimeCache = kubecontainer.NewFakeRuntimeCache(kubelet.containerManager)
|
||||||
kubelet.podWorkers = newPodWorkers(
|
kubelet.podWorkers = newPodWorkers(
|
||||||
kubelet.runtimeCache,
|
kubelet.runtimeCache,
|
||||||
@ -122,9 +122,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
|
|||||||
kubelet.containerManager.Puller = &dockertools.FakeDockerPuller{}
|
kubelet.containerManager.Puller = &dockertools.FakeDockerPuller{}
|
||||||
kubelet.prober = prober.New(nil, kubelet.readinessManager, kubelet.containerRefManager, kubelet.recorder)
|
kubelet.prober = prober.New(nil, kubelet.readinessManager, kubelet.containerRefManager, kubelet.recorder)
|
||||||
kubelet.containerManager.Prober = kubelet.prober
|
kubelet.containerManager.Prober = kubelet.prober
|
||||||
kubelet.handlerRunner = lifecycle.NewHandlerRunner(&fakeHTTP{}, &fakeContainerCommandRunner{}, kubelet.containerManager)
|
|
||||||
kubelet.volumeManager = newVolumeManager()
|
kubelet.volumeManager = newVolumeManager()
|
||||||
kubelet.runtimeHooks = newKubeletRuntimeHooks(kubelet.recorder)
|
|
||||||
return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, fakeMirrorClient}
|
return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, fakeMirrorClient}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -679,6 +677,16 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) {
|
|||||||
fakeDocker.Unlock()
|
fakeDocker.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type fakeHTTP struct {
|
||||||
|
url string
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeHTTP) Get(url string) (*http.Response, error) {
|
||||||
|
f.url = url
|
||||||
|
return nil, f.err
|
||||||
|
}
|
||||||
|
|
||||||
func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
|
func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
|
||||||
testKubelet := newTestKubelet(t)
|
testKubelet := newTestKubelet(t)
|
||||||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||||
@ -686,8 +694,12 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
|
|||||||
fakeDocker := testKubelet.fakeDocker
|
fakeDocker := testKubelet.fakeDocker
|
||||||
waitGroup := testKubelet.waitGroup
|
waitGroup := testKubelet.waitGroup
|
||||||
fakeHttp := fakeHTTP{}
|
fakeHttp := fakeHTTP{}
|
||||||
|
|
||||||
|
// Simulate HTTP failure. Re-create the containerManager to inject the failure.
|
||||||
kubelet.httpClient = &fakeHttp
|
kubelet.httpClient = &fakeHttp
|
||||||
kubelet.handlerRunner = lifecycle.NewHandlerRunner(kubelet.httpClient, &fakeContainerCommandRunner{}, kubelet.containerManager)
|
runtimeHooks := newKubeletRuntimeHooks(kubelet.recorder)
|
||||||
|
kubelet.containerManager = dockertools.NewDockerManager(kubelet.dockerClient, kubelet.recorder, kubelet.readinessManager, kubelet.containerRefManager, dockertools.PodInfraContainerImage, 0, 0, "", kubelet.os, kubelet.networkPlugin, nil, kubelet, kubelet.httpClient, runtimeHooks)
|
||||||
|
|
||||||
pods := []*api.Pod{
|
pods := []*api.Pod{
|
||||||
{
|
{
|
||||||
ObjectMeta: api.ObjectMeta{
|
ObjectMeta: api.ObjectMeta{
|
||||||
@ -740,7 +752,7 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
|
|||||||
// Get pod status.
|
// Get pod status.
|
||||||
"list", "inspect_container", "inspect_image",
|
"list", "inspect_container", "inspect_image",
|
||||||
// Check the pod infra container.
|
// Check the pod infra container.
|
||||||
"inspect_container",
|
"inspect_container", "inspect_image",
|
||||||
// Create container.
|
// Create container.
|
||||||
"create", "start",
|
"create", "start",
|
||||||
// Get pod status.
|
// Get pod status.
|
||||||
@ -753,7 +765,7 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
|
|||||||
}
|
}
|
||||||
fakeDocker.Unlock()
|
fakeDocker.Unlock()
|
||||||
if fakeHttp.url != "http://foo:8080/bar" {
|
if fakeHttp.url != "http://foo:8080/bar" {
|
||||||
t.Errorf("Unexpected handler: %s", fakeHttp.url)
|
t.Errorf("Unexpected handler: %q", fakeHttp.url)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1560,7 +1572,6 @@ func (f *fakeContainerCommandRunner) PortForward(pod *kubecontainer.Pod, port ui
|
|||||||
f.Stream = stream
|
f.Stream = stream
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRunInContainerNoSuchPod(t *testing.T) {
|
func TestRunInContainerNoSuchPod(t *testing.T) {
|
||||||
fakeCommandRunner := fakeContainerCommandRunner{}
|
fakeCommandRunner := fakeContainerCommandRunner{}
|
||||||
testKubelet := newTestKubelet(t)
|
testKubelet := newTestKubelet(t)
|
||||||
@ -1627,125 +1638,6 @@ func TestRunInContainer(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRunHandlerExec(t *testing.T) {
|
|
||||||
fakeCommandRunner := fakeContainerCommandRunner{}
|
|
||||||
testKubelet := newTestKubelet(t)
|
|
||||||
kubelet := testKubelet.kubelet
|
|
||||||
fakeDocker := testKubelet.fakeDocker
|
|
||||||
kubelet.runner = &fakeCommandRunner
|
|
||||||
kubelet.handlerRunner = lifecycle.NewHandlerRunner(&fakeHTTP{}, kubelet.runner, kubelet.containerManager)
|
|
||||||
|
|
||||||
containerID := "abc1234"
|
|
||||||
podName := "podFoo"
|
|
||||||
podNamespace := "nsFoo"
|
|
||||||
containerName := "containerFoo"
|
|
||||||
|
|
||||||
fakeDocker.ContainerList = []docker.APIContainers{
|
|
||||||
{
|
|
||||||
ID: containerID,
|
|
||||||
Names: []string{"/k8s_" + containerName + "_" + podName + "_" + podNamespace + "_12345678_42"},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
container := api.Container{
|
|
||||||
Name: containerName,
|
|
||||||
Lifecycle: &api.Lifecycle{
|
|
||||||
PostStart: &api.Handler{
|
|
||||||
Exec: &api.ExecAction{
|
|
||||||
Command: []string{"ls", "-a"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
pod := api.Pod{}
|
|
||||||
pod.ObjectMeta.Name = podName
|
|
||||||
pod.ObjectMeta.Namespace = podNamespace
|
|
||||||
pod.Spec.Containers = []api.Container{container}
|
|
||||||
err := kubelet.handlerRunner.Run(containerID, &pod, &container, container.Lifecycle.PostStart)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
if fakeCommandRunner.ID != containerID ||
|
|
||||||
!reflect.DeepEqual(container.Lifecycle.PostStart.Exec.Command, fakeCommandRunner.Cmd) {
|
|
||||||
t.Errorf("unexpected commands: %v", fakeCommandRunner)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type fakeHTTP struct {
|
|
||||||
url string
|
|
||||||
err error
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *fakeHTTP) Get(url string) (*http.Response, error) {
|
|
||||||
f.url = url
|
|
||||||
return nil, f.err
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestRunHandlerHttp(t *testing.T) {
|
|
||||||
fakeHttp := fakeHTTP{}
|
|
||||||
|
|
||||||
testKubelet := newTestKubelet(t)
|
|
||||||
kubelet := testKubelet.kubelet
|
|
||||||
kubelet.httpClient = &fakeHttp
|
|
||||||
kubelet.handlerRunner = lifecycle.NewHandlerRunner(kubelet.httpClient, &fakeContainerCommandRunner{}, kubelet.containerManager)
|
|
||||||
|
|
||||||
containerID := "abc1234"
|
|
||||||
podName := "podFoo"
|
|
||||||
podNamespace := "nsFoo"
|
|
||||||
containerName := "containerFoo"
|
|
||||||
|
|
||||||
container := api.Container{
|
|
||||||
Name: containerName,
|
|
||||||
Lifecycle: &api.Lifecycle{
|
|
||||||
PostStart: &api.Handler{
|
|
||||||
HTTPGet: &api.HTTPGetAction{
|
|
||||||
Host: "foo",
|
|
||||||
Port: util.IntOrString{IntVal: 8080, Kind: util.IntstrInt},
|
|
||||||
Path: "bar",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
pod := api.Pod{}
|
|
||||||
pod.ObjectMeta.Name = podName
|
|
||||||
pod.ObjectMeta.Namespace = podNamespace
|
|
||||||
pod.Spec.Containers = []api.Container{container}
|
|
||||||
err := kubelet.handlerRunner.Run(containerID, &pod, &container, container.Lifecycle.PostStart)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
if fakeHttp.url != "http://foo:8080/bar" {
|
|
||||||
t.Errorf("unexpected url: %s", fakeHttp.url)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestRunHandlerNil(t *testing.T) {
|
|
||||||
testKubelet := newTestKubelet(t)
|
|
||||||
kubelet := testKubelet.kubelet
|
|
||||||
|
|
||||||
containerID := "abc1234"
|
|
||||||
podName := "podFoo"
|
|
||||||
podNamespace := "nsFoo"
|
|
||||||
containerName := "containerFoo"
|
|
||||||
|
|
||||||
container := api.Container{
|
|
||||||
Name: containerName,
|
|
||||||
Lifecycle: &api.Lifecycle{
|
|
||||||
PostStart: &api.Handler{},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
pod := api.Pod{}
|
|
||||||
pod.ObjectMeta.Name = podName
|
|
||||||
pod.ObjectMeta.Namespace = podNamespace
|
|
||||||
pod.Spec.Containers = []api.Container{container}
|
|
||||||
err := kubelet.handlerRunner.Run(containerID, &pod, &container, container.Lifecycle.PostStart)
|
|
||||||
if err == nil {
|
|
||||||
t.Errorf("expect error, but got nil")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSyncPodEventHandlerFails(t *testing.T) {
|
func TestSyncPodEventHandlerFails(t *testing.T) {
|
||||||
testKubelet := newTestKubelet(t)
|
testKubelet := newTestKubelet(t)
|
||||||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||||
@ -1753,10 +1645,12 @@ func TestSyncPodEventHandlerFails(t *testing.T) {
|
|||||||
fakeDocker := testKubelet.fakeDocker
|
fakeDocker := testKubelet.fakeDocker
|
||||||
waitGroup := testKubelet.waitGroup
|
waitGroup := testKubelet.waitGroup
|
||||||
|
|
||||||
|
// Simulate HTTP failure. Re-create the containerManager to inject the failure.
|
||||||
kubelet.httpClient = &fakeHTTP{
|
kubelet.httpClient = &fakeHTTP{
|
||||||
err: fmt.Errorf("test error"),
|
err: fmt.Errorf("test error"),
|
||||||
}
|
}
|
||||||
kubelet.handlerRunner = lifecycle.NewHandlerRunner(kubelet.httpClient, &fakeContainerCommandRunner{}, kubelet.containerManager)
|
runtimeHooks := newKubeletRuntimeHooks(kubelet.recorder)
|
||||||
|
kubelet.containerManager = dockertools.NewDockerManager(kubelet.dockerClient, kubelet.recorder, kubelet.readinessManager, kubelet.containerRefManager, dockertools.PodInfraContainerImage, 0, 0, "", kubelet.os, kubelet.networkPlugin, nil, kubelet, kubelet.httpClient, runtimeHooks)
|
||||||
|
|
||||||
pods := []*api.Pod{
|
pods := []*api.Pod{
|
||||||
{
|
{
|
||||||
@ -1810,7 +1704,7 @@ func TestSyncPodEventHandlerFails(t *testing.T) {
|
|||||||
// Get pod status.
|
// Get pod status.
|
||||||
"list", "inspect_container", "inspect_image",
|
"list", "inspect_container", "inspect_image",
|
||||||
// Check the pod infra container.
|
// Check the pod infra container.
|
||||||
"inspect_container",
|
"inspect_container", "inspect_image",
|
||||||
// Create the container.
|
// Create the container.
|
||||||
"create", "start",
|
"create", "start",
|
||||||
// Kill the container since event handler fails.
|
// Kill the container since event handler fails.
|
||||||
@ -1820,7 +1714,7 @@ func TestSyncPodEventHandlerFails(t *testing.T) {
|
|||||||
|
|
||||||
// TODO(yifan): Check the stopped container's name.
|
// TODO(yifan): Check the stopped container's name.
|
||||||
if len(fakeDocker.Stopped) != 1 {
|
if len(fakeDocker.Stopped) != 1 {
|
||||||
t.Errorf("Wrong containers were stopped: %v", fakeDocker.Stopped)
|
t.Fatalf("Wrong containers were stopped: %v", fakeDocker.Stopped)
|
||||||
}
|
}
|
||||||
dockerName, _, err := dockertools.ParseDockerName(fakeDocker.Stopped[0])
|
dockerName, _, err := dockertools.ParseDockerName(fakeDocker.Stopped[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -4065,6 +3959,7 @@ func TestGetPodStatusWithLastTermination(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(vmarmol): Move this test away from using RunContainer().
|
||||||
func TestGetPodCreationFailureReason(t *testing.T) {
|
func TestGetPodCreationFailureReason(t *testing.T) {
|
||||||
testKubelet := newTestKubelet(t)
|
testKubelet := newTestKubelet(t)
|
||||||
kubelet := testKubelet.kubelet
|
kubelet := testKubelet.kubelet
|
||||||
@ -4089,7 +3984,7 @@ func TestGetPodCreationFailureReason(t *testing.T) {
|
|||||||
pods := []*api.Pod{pod}
|
pods := []*api.Pod{pod}
|
||||||
kubelet.podManager.SetPods(pods)
|
kubelet.podManager.SetPods(pods)
|
||||||
kubelet.volumeManager.SetVolumes(pod.UID, volumeMap{})
|
kubelet.volumeManager.SetVolumes(pod.UID, volumeMap{})
|
||||||
_, err := kubelet.containerManager.RunContainer(pod, &pod.Spec.Containers[0], kubelet, kubelet.handlerRunner, "", "")
|
_, err := kubelet.containerManager.RunContainer(pod, &pod.Spec.Containers[0], "", "")
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Errorf("expected error, found nil")
|
t.Errorf("expected error, found nil")
|
||||||
}
|
}
|
||||||
|
@ -23,7 +23,6 @@ import (
|
|||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
|
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/prober"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/prober"
|
||||||
kubeletTypes "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/types"
|
kubeletTypes "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/types"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
@ -33,11 +32,15 @@ import (
|
|||||||
type HandlerRunner struct {
|
type HandlerRunner struct {
|
||||||
httpGetter kubeletTypes.HttpGetter
|
httpGetter kubeletTypes.HttpGetter
|
||||||
commandRunner prober.ContainerCommandRunner
|
commandRunner prober.ContainerCommandRunner
|
||||||
containerManager *dockertools.DockerManager
|
containerManager podStatusProvider
|
||||||
|
}
|
||||||
|
|
||||||
|
type podStatusProvider interface {
|
||||||
|
GetPodStatus(pod *api.Pod) (*api.PodStatus, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(yifan): Merge commandRunner and containerManager once containerManager implements the ContainerCommandRunner interface.
|
// TODO(yifan): Merge commandRunner and containerManager once containerManager implements the ContainerCommandRunner interface.
|
||||||
func NewHandlerRunner(httpGetter kubeletTypes.HttpGetter, commandRunner prober.ContainerCommandRunner, containerManager *dockertools.DockerManager) kubecontainer.HandlerRunner {
|
func NewHandlerRunner(httpGetter kubeletTypes.HttpGetter, commandRunner prober.ContainerCommandRunner, containerManager podStatusProvider) kubecontainer.HandlerRunner {
|
||||||
return &HandlerRunner{
|
return &HandlerRunner{
|
||||||
httpGetter: httpGetter,
|
httpGetter: httpGetter,
|
||||||
commandRunner: commandRunner,
|
commandRunner: commandRunner,
|
||||||
|
@ -17,9 +17,13 @@ limitations under the License.
|
|||||||
package lifecycle
|
package lifecycle
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
|
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -67,3 +71,120 @@ func TestResolvePortStringUnknown(t *testing.T) {
|
|||||||
t.Error("unexpected non-error")
|
t.Error("unexpected non-error")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type fakeContainerCommandRunner struct {
|
||||||
|
Cmd []string
|
||||||
|
ID string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeContainerCommandRunner) RunInContainer(id string, cmd []string) ([]byte, error) {
|
||||||
|
f.Cmd = cmd
|
||||||
|
f.ID = id
|
||||||
|
return []byte{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeContainerCommandRunner) ExecInContainer(id string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeContainerCommandRunner) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRunHandlerExec(t *testing.T) {
|
||||||
|
fakeCommandRunner := fakeContainerCommandRunner{}
|
||||||
|
handlerRunner := NewHandlerRunner(&fakeHTTP{}, &fakeCommandRunner, nil)
|
||||||
|
|
||||||
|
containerID := "abc1234"
|
||||||
|
containerName := "containerFoo"
|
||||||
|
|
||||||
|
container := api.Container{
|
||||||
|
Name: containerName,
|
||||||
|
Lifecycle: &api.Lifecycle{
|
||||||
|
PostStart: &api.Handler{
|
||||||
|
Exec: &api.ExecAction{
|
||||||
|
Command: []string{"ls", "-a"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
pod := api.Pod{}
|
||||||
|
pod.ObjectMeta.Name = "podFoo"
|
||||||
|
pod.ObjectMeta.Namespace = "nsFoo"
|
||||||
|
pod.Spec.Containers = []api.Container{container}
|
||||||
|
err := handlerRunner.Run(containerID, &pod, &container, container.Lifecycle.PostStart)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if fakeCommandRunner.ID != containerID ||
|
||||||
|
!reflect.DeepEqual(container.Lifecycle.PostStart.Exec.Command, fakeCommandRunner.Cmd) {
|
||||||
|
t.Errorf("unexpected commands: %v", fakeCommandRunner)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type fakeHTTP struct {
|
||||||
|
url string
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeHTTP) Get(url string) (*http.Response, error) {
|
||||||
|
f.url = url
|
||||||
|
return nil, f.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRunHandlerHttp(t *testing.T) {
|
||||||
|
fakeHttp := fakeHTTP{}
|
||||||
|
handlerRunner := NewHandlerRunner(&fakeHttp, &fakeContainerCommandRunner{}, nil)
|
||||||
|
|
||||||
|
containerID := "abc1234"
|
||||||
|
containerName := "containerFoo"
|
||||||
|
|
||||||
|
container := api.Container{
|
||||||
|
Name: containerName,
|
||||||
|
Lifecycle: &api.Lifecycle{
|
||||||
|
PostStart: &api.Handler{
|
||||||
|
HTTPGet: &api.HTTPGetAction{
|
||||||
|
Host: "foo",
|
||||||
|
Port: util.IntOrString{IntVal: 8080, Kind: util.IntstrInt},
|
||||||
|
Path: "bar",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
pod := api.Pod{}
|
||||||
|
pod.ObjectMeta.Name = "podFoo"
|
||||||
|
pod.ObjectMeta.Namespace = "nsFoo"
|
||||||
|
pod.Spec.Containers = []api.Container{container}
|
||||||
|
err := handlerRunner.Run(containerID, &pod, &container, container.Lifecycle.PostStart)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if fakeHttp.url != "http://foo:8080/bar" {
|
||||||
|
t.Errorf("unexpected url: %s", fakeHttp.url)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRunHandlerNil(t *testing.T) {
|
||||||
|
handlerRunner := NewHandlerRunner(&fakeHTTP{}, &fakeContainerCommandRunner{}, nil)
|
||||||
|
containerID := "abc1234"
|
||||||
|
podName := "podFoo"
|
||||||
|
podNamespace := "nsFoo"
|
||||||
|
containerName := "containerFoo"
|
||||||
|
|
||||||
|
container := api.Container{
|
||||||
|
Name: containerName,
|
||||||
|
Lifecycle: &api.Lifecycle{
|
||||||
|
PostStart: &api.Handler{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
pod := api.Pod{}
|
||||||
|
pod.ObjectMeta.Name = podName
|
||||||
|
pod.ObjectMeta.Namespace = podNamespace
|
||||||
|
pod.Spec.Containers = []api.Container{container}
|
||||||
|
err := handlerRunner.Run(containerID, &pod, &container, container.Lifecycle.PostStart)
|
||||||
|
if err == nil {
|
||||||
|
t.Errorf("expect error, but got nil")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -43,7 +43,7 @@ func createPodWorkers() (*podWorkers, map[types.UID][]string) {
|
|||||||
fakeDocker := &dockertools.FakeDockerClient{}
|
fakeDocker := &dockertools.FakeDockerClient{}
|
||||||
fakeRecorder := &record.FakeRecorder{}
|
fakeRecorder := &record.FakeRecorder{}
|
||||||
np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
|
np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
|
||||||
dockerManager := dockertools.NewDockerManager(fakeDocker, fakeRecorder, nil, nil, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, &kubeletProber.FakeProber{})
|
dockerManager := dockertools.NewDockerManager(fakeDocker, fakeRecorder, nil, nil, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, &kubeletProber.FakeProber{}, nil, nil, newKubeletRuntimeHooks(fakeRecorder))
|
||||||
fakeRuntimeCache := kubecontainer.NewFakeRuntimeCache(dockerManager)
|
fakeRuntimeCache := kubecontainer.NewFakeRuntimeCache(dockerManager)
|
||||||
|
|
||||||
lock := sync.Mutex{}
|
lock := sync.Mutex{}
|
||||||
|
@ -90,7 +90,6 @@ func TestRunOnce(t *testing.T) {
|
|||||||
os: kubecontainer.FakeOS{},
|
os: kubecontainer.FakeOS{},
|
||||||
volumeManager: newVolumeManager(),
|
volumeManager: newVolumeManager(),
|
||||||
}
|
}
|
||||||
kb.runtimeHooks = newKubeletRuntimeHooks(kb.recorder)
|
|
||||||
|
|
||||||
kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
|
kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
|
||||||
if err := kb.setupDataDirs(); err != nil {
|
if err := kb.setupDataDirs(); err != nil {
|
||||||
@ -161,7 +160,10 @@ func TestRunOnce(t *testing.T) {
|
|||||||
"",
|
"",
|
||||||
kubecontainer.FakeOS{},
|
kubecontainer.FakeOS{},
|
||||||
kb.networkPlugin,
|
kb.networkPlugin,
|
||||||
&kubeletProber.FakeProber{})
|
&kubeletProber.FakeProber{},
|
||||||
|
kb,
|
||||||
|
nil,
|
||||||
|
newKubeletRuntimeHooks(kb.recorder))
|
||||||
kb.containerManager.Puller = &dockertools.FakeDockerPuller{}
|
kb.containerManager.Puller = &dockertools.FakeDockerPuller{}
|
||||||
|
|
||||||
pods := []*api.Pod{
|
pods := []*api.Pod{
|
||||||
|
Loading…
Reference in New Issue
Block a user