From 5f08555f457ac7ed8396e915e8c4d41a55e6539b Mon Sep 17 00:00:00 2001 From: Yifan Gu Date: Thu, 26 Mar 2015 11:59:41 -0700 Subject: [PATCH] kubelet: Refactor kubelet.runContainer. Push the run container logic into container runtime. --- pkg/kubelet/container/runtime.go | 25 ++++ pkg/kubelet/dockertools/docker.go | 172 ++++++++++++++++++++++ pkg/kubelet/dockertools/docker_test.go | 64 ++++++++ pkg/kubelet/kubelet.go | 196 ++++++------------------- pkg/kubelet/kubelet_test.go | 64 -------- 5 files changed, 304 insertions(+), 217 deletions(-) diff --git a/pkg/kubelet/container/runtime.go b/pkg/kubelet/container/runtime.go index 5d0ae92fc65..28426daeff6 100644 --- a/pkg/kubelet/container/runtime.go +++ b/pkg/kubelet/container/runtime.go @@ -89,6 +89,31 @@ type Container struct { Created int64 } +// RunContainerOptions specify the options which are necessary for running containers +type RunContainerOptions struct { + // The environment variables, they are in the form of 'key=value'. + Envs []string + // The mounts for the containers, they are in the form of: + // 'hostPath:containerPath', or + // 'hostPath:containerPath:ro', if the path read only. + Binds []string + // If the container has specified the TerminationMessagePath, then + // this directory will be used to create and mount the log file to + // container.TerminationMessagePath + PodContainerDir string + // The list of DNS servers for the container to use. + DNS []string + // The list of DNS search domains. + DNSSearch []string + // Docker namespace identifiers(currently we have 'NetMode' and 'IpcMode'. + // These are for docker to attach a container in a pod to the pod infra + // container's namespace. + // TODO(yifan): Remove these after we pushed the pod infra container logic + // into docker's container runtime. + NetMode string + IpcMode string +} + type Pods []*Pod // FindPodByID returns a pod in the pod list by UID. It will return an empty pod diff --git a/pkg/kubelet/dockertools/docker.go b/pkg/kubelet/dockertools/docker.go index 8e3f5a99f83..ea1390817e1 100644 --- a/pkg/kubelet/dockertools/docker.go +++ b/pkg/kubelet/dockertools/docker.go @@ -27,10 +27,13 @@ import ( "math/rand" "os" "os/exec" + "path" "strconv" "strings" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider" kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/leaky" @@ -46,6 +49,13 @@ const ( DockerPrefix = "docker://" ) +const ( + // Taken from lmctfy https://github.com/google/lmctfy/blob/master/lmctfy/controllers/cpu_controller.cc + minShares = 2 + sharesPerCPU = 1024 + milliCPUToCPU = 1000 +) + // DockerInterface is an abstract interface for testability. It abstracts the interface of docker.Client. type DockerInterface interface { ListContainers(options docker.ListContainersOptions) ([]docker.APIContainers, error) @@ -872,3 +882,165 @@ func GetPods(client DockerInterface, all bool) ([]*kubecontainer.Pod, error) { } return result, nil } + +func milliCPUToShares(milliCPU int64) int64 { + if milliCPU == 0 { + // zero milliCPU means unset. Use kernel default. + return 0 + } + // Conceptually (milliCPU / milliCPUToCPU) * sharesPerCPU, but factored to improve rounding. + shares := (milliCPU * sharesPerCPU) / milliCPUToCPU + if shares < minShares { + return minShares + } + return shares +} + +func makePortsAndBindings(container *api.Container) (map[docker.Port]struct{}, map[docker.Port][]docker.PortBinding) { + exposedPorts := map[docker.Port]struct{}{} + portBindings := map[docker.Port][]docker.PortBinding{} + for _, port := range container.Ports { + exteriorPort := port.HostPort + if exteriorPort == 0 { + // No need to do port binding when HostPort is not specified + continue + } + interiorPort := port.ContainerPort + // Some of this port stuff is under-documented voodoo. + // See http://stackoverflow.com/questions/20428302/binding-a-port-to-a-host-interface-using-the-rest-api + var protocol string + switch strings.ToUpper(string(port.Protocol)) { + case "UDP": + protocol = "/udp" + case "TCP": + protocol = "/tcp" + default: + glog.Warningf("Unknown protocol %q: defaulting to TCP", port.Protocol) + protocol = "/tcp" + } + dockerPort := docker.Port(strconv.Itoa(interiorPort) + protocol) + exposedPorts[dockerPort] = struct{}{} + portBindings[dockerPort] = []docker.PortBinding{ + { + HostPort: strconv.Itoa(exteriorPort), + HostIP: port.HostIP, + }, + } + } + return exposedPorts, portBindings +} + +func makeCapabilites(capAdd []api.CapabilityType, capDrop []api.CapabilityType) ([]string, []string) { + var ( + addCaps []string + dropCaps []string + ) + for _, cap := range capAdd { + addCaps = append(addCaps, string(cap)) + } + for _, cap := range capDrop { + dropCaps = append(dropCaps, string(cap)) + } + return addCaps, dropCaps +} + +// RunContainer creates and starts a docker container with the required RunContainerOptions. +// On success it will return the container's ID with nil error. During the process, it will +// use the reference and event recorder to report the state of the container (e.g. created, +// started, failed, etc.). +// TODO(yifan): To use a strong type for the returned container ID. +func RunContainer(client DockerInterface, container *api.Container, pod *api.Pod, opts *kubecontainer.RunContainerOptions, + refManager *kubecontainer.RefManager, ref *api.ObjectReference, recorder record.EventRecorder) (string, error) { + dockerName := KubeletContainerName{ + PodFullName: kubecontainer.GetPodFullName(pod), + PodUID: pod.UID, + ContainerName: container.Name, + } + exposedPorts, portBindings := makePortsAndBindings(container) + // TODO(vmarmol): Handle better. + // Cap hostname at 63 chars (specification is 64bytes which is 63 chars and the null terminating char). + const hostnameMaxLen = 63 + containerHostname := pod.Name + if len(containerHostname) > hostnameMaxLen { + containerHostname = containerHostname[:hostnameMaxLen] + } + dockerOpts := docker.CreateContainerOptions{ + Name: BuildDockerName(dockerName, container), + Config: &docker.Config{ + Cmd: container.Command, + Env: opts.Envs, + ExposedPorts: exposedPorts, + Hostname: containerHostname, + Image: container.Image, + Memory: container.Resources.Limits.Memory().Value(), + CPUShares: milliCPUToShares(container.Resources.Limits.Cpu().MilliValue()), + WorkingDir: container.WorkingDir, + }, + } + dockerContainer, err := client.CreateContainer(dockerOpts) + if err != nil { + if ref != nil { + recorder.Eventf(ref, "failed", "Failed to create docker container with error: %v", err) + } + return "", err + } + // Remember this reference so we can report events about this container + if ref != nil { + refManager.SetRef(dockerContainer.ID, ref) + recorder.Eventf(ref, "created", "Created with docker id %v", dockerContainer.ID) + } + + // The reason we create and mount the log file in here (not in kubelet) is because + // the file's location depends on the ID of the container, and we need to create and + // mount the file before actually starting the container. + // TODO(yifan): Consider to pull this logic out since we might need to reuse it in + // other container runtime. + if opts.PodContainerDir != "" && len(container.TerminationMessagePath) != 0 { + containerLogPath := path.Join(opts.PodContainerDir, dockerContainer.ID) + fs, err := os.Create(containerLogPath) + if err != nil { + // TODO: Clean up the previouly created dir? return the error? + glog.Errorf("Error on creating termination-log file %q: %v", containerLogPath, err) + } else { + fs.Close() // Close immediately; we're just doing a `touch` here + b := fmt.Sprintf("%s:%s", containerLogPath, container.TerminationMessagePath) + opts.Binds = append(opts.Binds, b) + } + } + + privileged := false + if capabilities.Get().AllowPrivileged { + privileged = container.Privileged + } else if container.Privileged { + return "", fmt.Errorf("container requested privileged mode, but it is disallowed globally.") + } + + capAdd, capDrop := makeCapabilites(container.Capabilities.Add, container.Capabilities.Drop) + hc := &docker.HostConfig{ + PortBindings: portBindings, + Binds: opts.Binds, + NetworkMode: opts.NetMode, + IpcMode: opts.IpcMode, + Privileged: privileged, + CapAdd: capAdd, + CapDrop: capDrop, + } + if len(opts.DNS) > 0 { + hc.DNS = opts.DNS + } + if len(opts.DNSSearch) > 0 { + hc.DNSSearch = opts.DNSSearch + } + + if err = client.StartContainer(dockerContainer.ID, hc); err != nil { + if ref != nil { + recorder.Eventf(ref, "failed", + "Failed to start with docker id %v with error: %v", dockerContainer.ID, err) + } + return "", err + } + if ref != nil { + recorder.Eventf(ref, "started", "Started with docker id %v", dockerContainer.ID) + } + return dockerContainer.ID, nil +} diff --git a/pkg/kubelet/dockertools/docker_test.go b/pkg/kubelet/dockertools/docker_test.go index 452144badf1..65963bd30ff 100644 --- a/pkg/kubelet/dockertools/docker_test.go +++ b/pkg/kubelet/dockertools/docker_test.go @@ -637,3 +637,67 @@ func TestFindContainersByPod(t *testing.T) { } } } + +func TestMakePortsAndBindings(t *testing.T) { + container := api.Container{ + Ports: []api.ContainerPort{ + { + ContainerPort: 80, + HostPort: 8080, + HostIP: "127.0.0.1", + }, + { + ContainerPort: 443, + HostPort: 443, + Protocol: "tcp", + }, + { + ContainerPort: 444, + HostPort: 444, + Protocol: "udp", + }, + { + ContainerPort: 445, + HostPort: 445, + Protocol: "foobar", + }, + }, + } + exposedPorts, bindings := makePortsAndBindings(&container) + if len(container.Ports) != len(exposedPorts) || + len(container.Ports) != len(bindings) { + t.Errorf("Unexpected ports and bindings, %#v %#v %#v", container, exposedPorts, bindings) + } + for key, value := range bindings { + switch value[0].HostPort { + case "8080": + if !reflect.DeepEqual(docker.Port("80/tcp"), key) { + t.Errorf("Unexpected docker port: %#v", key) + } + if value[0].HostIP != "127.0.0.1" { + t.Errorf("Unexpected host IP: %s", value[0].HostIP) + } + case "443": + if !reflect.DeepEqual(docker.Port("443/tcp"), key) { + t.Errorf("Unexpected docker port: %#v", key) + } + if value[0].HostIP != "" { + t.Errorf("Unexpected host IP: %s", value[0].HostIP) + } + case "444": + if !reflect.DeepEqual(docker.Port("444/udp"), key) { + t.Errorf("Unexpected docker port: %#v", key) + } + if value[0].HostIP != "" { + t.Errorf("Unexpected host IP: %s", value[0].HostIP) + } + case "445": + if !reflect.DeepEqual(docker.Port("445/tcp"), key) { + t.Errorf("Unexpected docker port: %#v", key) + } + if value[0].HostIP != "" { + t.Errorf("Unexpected host IP: %s", value[0].HostIP) + } + } + } +} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 96dc93059a6..056e205d13c 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -26,7 +26,6 @@ import ( "os" "path" "sort" - "strconv" "strings" "sync" "time" @@ -60,11 +59,6 @@ import ( ) const ( - // Taken from lmctfy https://github.com/google/lmctfy/blob/master/lmctfy/controllers/cpu_controller.cc - minShares = 2 - sharesPerCPU = 1024 - milliCPUToCPU = 1000 - // The oom_score_adj of the POD infrastructure container. The default is 0, so // any value below that makes it *less* likely to get OOM killed. podOomScoreAdj = -100 @@ -582,67 +576,6 @@ func makeBinds(container *api.Container, podVolumes volumeMap) []string { return binds } -func makePortsAndBindings(container *api.Container) (map[docker.Port]struct{}, map[docker.Port][]docker.PortBinding) { - exposedPorts := map[docker.Port]struct{}{} - portBindings := map[docker.Port][]docker.PortBinding{} - for _, port := range container.Ports { - exteriorPort := port.HostPort - if exteriorPort == 0 { - // No need to do port binding when HostPort is not specified - continue - } - interiorPort := port.ContainerPort - // Some of this port stuff is under-documented voodoo. - // See http://stackoverflow.com/questions/20428302/binding-a-port-to-a-host-interface-using-the-rest-api - var protocol string - switch strings.ToUpper(string(port.Protocol)) { - case "UDP": - protocol = "/udp" - case "TCP": - protocol = "/tcp" - default: - glog.Warningf("Unknown protocol %q: defaulting to TCP", port.Protocol) - protocol = "/tcp" - } - dockerPort := docker.Port(strconv.Itoa(interiorPort) + protocol) - exposedPorts[dockerPort] = struct{}{} - portBindings[dockerPort] = []docker.PortBinding{ - { - HostPort: strconv.Itoa(exteriorPort), - HostIP: port.HostIP, - }, - } - } - return exposedPorts, portBindings -} - -func milliCPUToShares(milliCPU int64) int64 { - if milliCPU == 0 { - // zero milliCPU means unset. Use kernel default. - return 0 - } - // Conceptually (milliCPU / milliCPUToCPU) * sharesPerCPU, but factored to improve rounding. - shares := (milliCPU * sharesPerCPU) / milliCPUToCPU - if shares < minShares { - return minShares - } - return shares -} - -func makeCapabilites(capAdd []api.CapabilityType, capDrop []api.CapabilityType) ([]string, []string) { - var ( - addCaps []string - dropCaps []string - ) - for _, cap := range capAdd { - addCaps = append(addCaps, string(cap)) - } - for _, cap := range capDrop { - dropCaps = append(dropCaps, string(cap)) - } - return addCaps, dropCaps -} - // A basic interface that knows how to execute handlers type actionHandler interface { Run(podFullName string, uid types.UID, container *api.Container, handler *api.Handler) error @@ -668,51 +601,19 @@ func (kl *Kubelet) runHandler(podFullName string, uid types.UID, container *api. return actionHandler.Run(podFullName, uid, container, handler) } -// Run a single container from a pod. Returns the docker container ID -func (kl *Kubelet) runContainer(pod *api.Pod, container *api.Container, podVolumes volumeMap, netMode, ipcMode string) (id dockertools.DockerID, err error) { - ref, err := kl.containerRefManager.GenerateContainerRef(pod, container) - if err != nil { - glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err) +// generateRunContainerOptions generates the RunContainerOptions, which can be used by +// the container runtime to set parameters for launching a container. +func (kl *Kubelet) generateRunContainerOptions(pod *api.Pod, container *api.Container, podVolumes volumeMap, netMode, ipcMode string) (*kubecontainer.RunContainerOptions, error) { + var err error + opts := &kubecontainer.RunContainerOptions{ + NetMode: netMode, + IpcMode: ipcMode, } - envVariables, err := kl.makeEnvironmentVariables(pod.Namespace, container) + opts.Binds = makeBinds(container, podVolumes) + opts.Envs, err = kl.makeEnvironmentVariables(pod.Namespace, container) if err != nil { - return "", err - } - binds := makeBinds(container, podVolumes) - exposedPorts, portBindings := makePortsAndBindings(container) - - // TODO(vmarmol): Handle better. - // Cap hostname at 63 chars (specification is 64bytes which is 63 chars and the null terminating char). - const hostnameMaxLen = 63 - containerHostname := pod.Name - if len(containerHostname) > hostnameMaxLen { - containerHostname = containerHostname[:hostnameMaxLen] - } - opts := docker.CreateContainerOptions{ - Name: dockertools.BuildDockerName(dockertools.KubeletContainerName{kubecontainer.GetPodFullName(pod), pod.UID, container.Name}, container), - Config: &docker.Config{ - Cmd: container.Command, - Env: envVariables, - ExposedPorts: exposedPorts, - Hostname: containerHostname, - Image: container.Image, - Memory: container.Resources.Limits.Memory().Value(), - CPUShares: milliCPUToShares(container.Resources.Limits.Cpu().MilliValue()), - WorkingDir: container.WorkingDir, - }, - } - dockerContainer, err := kl.dockerClient.CreateContainer(opts) - if err != nil { - if ref != nil { - kl.recorder.Eventf(ref, "failed", "Failed to create docker container with error: %v", err) - } - return "", err - } - // Remember this reference so we can report events about this container - if ref != nil { - kl.containerRefManager.SetRef(dockerContainer.ID, ref) - kl.recorder.Eventf(ref, "created", "Created with docker id %v", dockerContainer.ID) + return nil, err } if len(container.TerminationMessagePath) != 0 { @@ -720,60 +621,45 @@ func (kl *Kubelet) runContainer(pod *api.Pod, container *api.Container, podVolum if err := os.MkdirAll(p, 0750); err != nil { glog.Errorf("Error on creating %q: %v", p, err) } else { - containerLogPath := path.Join(p, dockerContainer.ID) - fs, err := os.Create(containerLogPath) - if err != nil { - // TODO: Clean up the previouly created dir? return the error? - glog.Errorf("Error on creating termination-log file %q: %v", containerLogPath, err) - } else { - fs.Close() // Close immediately; we're just doing a `touch` here - b := fmt.Sprintf("%s:%s", containerLogPath, container.TerminationMessagePath) - binds = append(binds, b) - } + opts.PodContainerDir = p } } - privileged := false - if capabilities.Get().AllowPrivileged { - privileged = container.Privileged - } else if container.Privileged { - return "", fmt.Errorf("container requested privileged mode, but it is disallowed globally.") - } - - capAdd, capDrop := makeCapabilites(container.Capabilities.Add, container.Capabilities.Drop) - hc := &docker.HostConfig{ - PortBindings: portBindings, - Binds: binds, - NetworkMode: netMode, - IpcMode: ipcMode, - Privileged: privileged, - CapAdd: capAdd, - CapDrop: capDrop, - } if pod.Spec.DNSPolicy == api.DNSClusterFirst { - if err := kl.applyClusterDNS(hc, pod); err != nil { - return "", err + opts.DNS, opts.DNSSearch, err = kl.getClusterDNS(pod) + if err != nil { + return nil, err } } - err = kl.dockerClient.StartContainer(dockerContainer.ID, hc) + return opts, nil +} + +// Run a single container from a pod. Returns the docker container ID +func (kl *Kubelet) runContainer(pod *api.Pod, container *api.Container, podVolumes volumeMap, netMode, ipcMode string) (dockertools.DockerID, error) { + ref, err := kl.containerRefManager.GenerateContainerRef(pod, container) + if err != nil { + glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err) + } + + opts, err := kl.generateRunContainerOptions(pod, container, podVolumes, netMode, ipcMode) if err != nil { - if ref != nil { - kl.recorder.Eventf(ref, "failed", - "Failed to start with docker id %v with error: %v", dockerContainer.ID, err) - } return "", err } - if ref != nil { - kl.recorder.Eventf(ref, "started", "Started with docker id %v", dockerContainer.ID) + + // TODO(yifan): Replace with RunContainerInPod, so we can eliminate 'netMode', 'ipcMode' + // by handling the pod infra container in the container runtime's implementation. + id, err := dockertools.RunContainer(kl.dockerClient, container, pod, opts, kl.containerRefManager, ref, kl.recorder) + if err != nil { + return "", err } if container.Lifecycle != nil && container.Lifecycle.PostStart != nil { handlerErr := kl.runHandler(kubecontainer.GetPodFullName(pod), pod.UID, container, container.Lifecycle.PostStart) if handlerErr != nil { - kl.killContainerByID(dockerContainer.ID) + kl.killContainerByID(id) return dockertools.DockerID(""), fmt.Errorf("failed to call event handler: %v", handlerErr) } } - return dockertools.DockerID(dockerContainer.ID), err + return dockertools.DockerID(id), err } var masterServices = util.NewStringSet("kubernetes", "kubernetes-ro") @@ -865,27 +751,31 @@ func (kl *Kubelet) makeEnvironmentVariables(ns string, container *api.Container) return result, nil } -func (kl *Kubelet) applyClusterDNS(hc *docker.HostConfig, pod *api.Pod) error { +// getClusterDNS returns a list of the DNS servers and a list of the DNS search +// domains of the cluster. +func (kl *Kubelet) getClusterDNS(pod *api.Pod) ([]string, []string, error) { // Get host DNS settings and append them to cluster DNS settings. f, err := os.Open("/etc/resolv.conf") if err != nil { - return err + return nil, nil, err } defer f.Close() hostDNS, hostSearch, err := parseResolvConf(f) if err != nil { - return err + return nil, nil, err } + var dns, dnsSearch []string + if kl.clusterDNS != nil { - hc.DNS = append([]string{kl.clusterDNS.String()}, hostDNS...) + dns = append([]string{kl.clusterDNS.String()}, hostDNS...) } if kl.clusterDomain != "" { nsDomain := fmt.Sprintf("%s.%s", pod.Namespace, kl.clusterDomain) - hc.DNSSearch = append([]string{nsDomain, kl.clusterDomain}, hostSearch...) + dnsSearch = append([]string{nsDomain, kl.clusterDomain}, hostSearch...) } - return nil + return dns, dnsSearch, nil } // Returns the list of DNS servers and DNS search domains. diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 9892f2fe323..6a31ad5cd5f 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -1201,70 +1201,6 @@ func TestMakeVolumesAndBinds(t *testing.T) { verifyStringArrayEquals(t, binds, expectedBinds) } -func TestMakePortsAndBindings(t *testing.T) { - container := api.Container{ - Ports: []api.ContainerPort{ - { - ContainerPort: 80, - HostPort: 8080, - HostIP: "127.0.0.1", - }, - { - ContainerPort: 443, - HostPort: 443, - Protocol: "tcp", - }, - { - ContainerPort: 444, - HostPort: 444, - Protocol: "udp", - }, - { - ContainerPort: 445, - HostPort: 445, - Protocol: "foobar", - }, - }, - } - exposedPorts, bindings := makePortsAndBindings(&container) - if len(container.Ports) != len(exposedPorts) || - len(container.Ports) != len(bindings) { - t.Errorf("Unexpected ports and bindings, %#v %#v %#v", container, exposedPorts, bindings) - } - for key, value := range bindings { - switch value[0].HostPort { - case "8080": - if !reflect.DeepEqual(docker.Port("80/tcp"), key) { - t.Errorf("Unexpected docker port: %#v", key) - } - if value[0].HostIP != "127.0.0.1" { - t.Errorf("Unexpected host IP: %s", value[0].HostIP) - } - case "443": - if !reflect.DeepEqual(docker.Port("443/tcp"), key) { - t.Errorf("Unexpected docker port: %#v", key) - } - if value[0].HostIP != "" { - t.Errorf("Unexpected host IP: %s", value[0].HostIP) - } - case "444": - if !reflect.DeepEqual(docker.Port("444/udp"), key) { - t.Errorf("Unexpected docker port: %#v", key) - } - if value[0].HostIP != "" { - t.Errorf("Unexpected host IP: %s", value[0].HostIP) - } - case "445": - if !reflect.DeepEqual(docker.Port("445/tcp"), key) { - t.Errorf("Unexpected docker port: %#v", key) - } - if value[0].HostIP != "" { - t.Errorf("Unexpected host IP: %s", value[0].HostIP) - } - } - } -} - type errorTestingDockerClient struct { dockertools.FakeDockerClient listContainersError error