Merge pull request #5702 from yifan-gu/get_pods

Introduce GetPods() to replace GetKubeletDockerContainers()
This commit is contained in:
Dawn Chen 2015-03-24 17:13:32 -07:00
commit 1665f4bc87
19 changed files with 296 additions and 172 deletions

View File

@ -25,6 +25,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/config" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/config"
utilerrors "github.com/GoogleCloudPlatform/kubernetes/pkg/util/errors" utilerrors "github.com/GoogleCloudPlatform/kubernetes/pkg/util/errors"
@ -206,7 +207,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
filtered := filterInvalidPods(update.Pods, source, s.recorder) filtered := filterInvalidPods(update.Pods, source, s.recorder)
for _, ref := range filtered { for _, ref := range filtered {
name := kubelet.GetPodFullName(ref) name := kubecontainer.GetPodFullName(ref)
if existing, found := pods[name]; found { if existing, found := pods[name]; found {
if !reflect.DeepEqual(existing.Spec, ref.Spec) { if !reflect.DeepEqual(existing.Spec, ref.Spec) {
// this is an update // this is an update
@ -229,7 +230,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
case kubelet.REMOVE: case kubelet.REMOVE:
glog.V(4).Infof("Removing a pod %v", update) glog.V(4).Infof("Removing a pod %v", update)
for _, value := range update.Pods { for _, value := range update.Pods {
name := kubelet.GetPodFullName(&value) name := kubecontainer.GetPodFullName(&value)
if existing, found := pods[name]; found { if existing, found := pods[name]; found {
// this is a delete // this is a delete
delete(pods, name) delete(pods, name)
@ -248,7 +249,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
filtered := filterInvalidPods(update.Pods, source, s.recorder) filtered := filterInvalidPods(update.Pods, source, s.recorder)
for _, ref := range filtered { for _, ref := range filtered {
name := kubelet.GetPodFullName(ref) name := kubecontainer.GetPodFullName(ref)
if existing, found := oldPods[name]; found { if existing, found := oldPods[name]; found {
pods[name] = existing pods[name] = existing
if !reflect.DeepEqual(existing.Spec, ref.Spec) { if !reflect.DeepEqual(existing.Spec, ref.Spec) {
@ -306,7 +307,7 @@ func filterInvalidPods(pods []api.Pod, source string, recorder record.EventRecor
// If validation fails, don't trust it any further - // If validation fails, don't trust it any further -
// even Name could be bad. // even Name could be bad.
} else { } else {
name := kubelet.GetPodFullName(pod) name := kubecontainer.GetPodFullName(pod)
if names.Has(name) { if names.Has(name) {
errlist = append(errlist, fielderrors.NewFieldDuplicate("name", pod.Name)) errlist = append(errlist, fielderrors.NewFieldDuplicate("name", pod.Name))
} else { } else {

View File

@ -17,6 +17,9 @@ limitations under the License.
package container package container
import ( import (
"fmt"
"strings"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume" "github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
@ -32,11 +35,11 @@ type Runtime interface {
// exited and dead containers (used for garbage collection). // exited and dead containers (used for garbage collection).
GetPods(all bool) ([]*Pod, error) GetPods(all bool) ([]*Pod, error)
// RunPod starts all the containers of a pod within a namespace. // RunPod starts all the containers of a pod within a namespace.
RunPod(*api.Pod, map[string]volume.VolumePlugin) error RunPod(*api.Pod, map[string]volume.Volume) error
// KillPod kills all the containers of a pod. // KillPod kills all the containers of a pod.
KillPod(*api.Pod) error KillPod(*api.Pod) error
// RunContainerInPod starts a container within the same namespace of a pod. // RunContainerInPod starts a container within the same namespace of a pod.
RunContainerInPod(api.Container, *api.Pod, map[string]volume.VolumePlugin) error RunContainerInPod(api.Container, *api.Pod, map[string]volume.Volume) error
// KillContainerInPod kills a container in the pod. // KillContainerInPod kills a container in the pod.
KillContainerInPod(api.Container, *api.Pod) error KillContainerInPod(api.Container, *api.Pod) error
// GetPodStatus retrieves the status of the pod, including the information of // GetPodStatus retrieves the status of the pod, including the information of
@ -81,4 +84,54 @@ type Container struct {
// Hash of the container, used for comparison. Optional for containers // Hash of the container, used for comparison. Optional for containers
// not managed by kubelet. // not managed by kubelet.
Hash uint64 Hash uint64
// The timestamp of the creation time of the container.
// TODO(yifan): Consider to move it to api.ContainerStatus.
Created int64
}
type Pods []*Pod
// FindPodByID returns a pod in the pod list by UID. It will return an empty pod
// if not found.
// TODO(yifan): Use a map?
func (p Pods) FindPodByID(podUID types.UID) Pod {
for i := range p {
if p[i].ID == podUID {
return *p[i]
}
}
return Pod{}
}
// FindContainerByName returns a container in the pod with the given name.
// When there are multiple containers with the same name, the first match will
// be returned.
func (p *Pod) FindContainerByName(containerName string) *Container {
for _, c := range p.Containers {
if c.Name == containerName {
return c
}
}
return nil
}
// GetPodFullName returns a name that uniquely identifies a pod.
func GetPodFullName(pod *api.Pod) string {
// Use underscore as the delimiter because it is not allowed in pod name
// (DNS subdomain format), while allowed in the container name format.
return fmt.Sprintf("%s_%s", pod.Name, pod.Namespace)
}
// Build the pod full name from pod name and namespace.
func BuildPodFullName(name, namespace string) string {
return name + "_" + namespace
}
// Parse the pod full name.
func ParsePodFullName(podFullName string) (string, string, error) {
parts := strings.Split(podFullName, "_")
if len(parts) != 2 {
return "", "", fmt.Errorf("failed to parse the pod full name %q", podFullName)
}
return parts[0], parts[1], nil
} }

View File

@ -32,6 +32,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/leaky" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/leaky"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
@ -816,3 +817,52 @@ type ContainerCommandRunner interface {
ExecInContainer(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error ExecInContainer(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error
PortForward(podInfraContainerID string, port uint16, stream io.ReadWriteCloser) error PortForward(podInfraContainerID string, port uint16, stream io.ReadWriteCloser) error
} }
func GetPods(client DockerInterface, all bool) ([]*kubecontainer.Pod, error) {
pods := make(map[types.UID]*kubecontainer.Pod)
var result []*kubecontainer.Pod
containers, err := GetKubeletDockerContainers(client, all)
if err != nil {
return nil, err
}
// Group containers by pod.
for _, c := range containers {
if len(c.Names) == 0 {
glog.Warningf("Cannot parse empty docker container name: %#v", c.Names)
continue
}
dockerName, hash, err := ParseDockerName(c.Names[0])
if err != nil {
glog.Warningf("Parse docker container name %q error: %v", c.Names[0], err)
continue
}
pod, found := pods[dockerName.PodUID]
if !found {
name, namespace, err := kubecontainer.ParsePodFullName(dockerName.PodFullName)
if err != nil {
glog.Warningf("Parse pod full name %q error: %v", dockerName.PodFullName, err)
continue
}
pod = &kubecontainer.Pod{
ID: dockerName.PodUID,
Name: name,
Namespace: namespace,
}
pods[dockerName.PodUID] = pod
}
pod.Containers = append(pod.Containers, &kubecontainer.Container{
ID: types.UID(c.ID),
Name: dockerName.ContainerName,
Hash: hash,
Created: c.Created,
})
}
// Convert map to list.
for _, c := range pods {
result = append(result, c)
}
return result, nil
}

View File

@ -19,10 +19,12 @@ package dockertools
import ( import (
"sync" "sync"
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
) )
type DockerCache interface { type DockerCache interface {
RunningContainers() (DockerContainers, error) GetPods() ([]*container.Pod, error)
ForceUpdateIfOlder(time.Time) error ForceUpdateIfOlder(time.Time) error
} }
@ -43,7 +45,7 @@ type dockerCache struct {
// Last time when cache was updated. // Last time when cache was updated.
cacheTime time.Time cacheTime time.Time
// The content of the cache. // The content of the cache.
containers DockerContainers pods []*container.Pod
// Whether the background thread updating the cache is running. // Whether the background thread updating the cache is running.
updatingCache bool updatingCache bool
// Time when the background thread should be stopped. // Time when the background thread should be stopped.
@ -53,15 +55,15 @@ type dockerCache struct {
// Ensure that dockerCache abides by the DockerCache interface. // Ensure that dockerCache abides by the DockerCache interface.
var _ DockerCache = new(dockerCache) var _ DockerCache = new(dockerCache)
func (d *dockerCache) RunningContainers() (DockerContainers, error) { func (d *dockerCache) GetPods() ([]*container.Pod, error) {
d.lock.Lock() d.lock.Lock()
defer d.lock.Unlock() defer d.lock.Unlock()
if time.Since(d.cacheTime) > 2*time.Second { if time.Since(d.cacheTime) > 2*time.Second {
containers, err := GetKubeletDockerContainers(d.client, false) pods, err := GetPods(d.client, false)
if err != nil { if err != nil {
return containers, err return pods, err
} }
d.containers = containers d.pods = pods
d.cacheTime = time.Now() d.cacheTime = time.Now()
} }
// Stop refreshing thread if there were no requests within last 2 seconds. // Stop refreshing thread if there were no requests within last 2 seconds.
@ -70,18 +72,18 @@ func (d *dockerCache) RunningContainers() (DockerContainers, error) {
d.updatingCache = true d.updatingCache = true
go d.startUpdatingCache() go d.startUpdatingCache()
} }
return d.containers, nil return d.pods, nil
} }
func (d *dockerCache) ForceUpdateIfOlder(minExpectedCacheTime time.Time) error { func (d *dockerCache) ForceUpdateIfOlder(minExpectedCacheTime time.Time) error {
d.lock.Lock() d.lock.Lock()
defer d.lock.Unlock() defer d.lock.Unlock()
if d.cacheTime.Before(minExpectedCacheTime) { if d.cacheTime.Before(minExpectedCacheTime) {
containers, err := GetKubeletDockerContainers(d.client, false) pods, err := GetPods(d.client, false)
if err != nil { if err != nil {
return err return err
} }
d.containers = containers d.pods = pods
d.cacheTime = time.Now() d.cacheTime = time.Now()
} }
return nil return nil
@ -91,7 +93,7 @@ func (d *dockerCache) startUpdatingCache() {
run := true run := true
for run { for run {
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
containers, err := GetKubeletDockerContainers(d.client, false) pods, err := GetPods(d.client, false)
cacheTime := time.Now() cacheTime := time.Now()
if err != nil { if err != nil {
continue continue
@ -102,7 +104,7 @@ func (d *dockerCache) startUpdatingCache() {
d.updatingCache = false d.updatingCache = false
run = false run = false
} }
d.containers = containers d.pods = pods
d.cacheTime = cacheTime d.cacheTime = cacheTime
d.lock.Unlock() d.lock.Unlock()
} }

View File

@ -22,6 +22,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/fsouza/go-dockerclient" "github.com/fsouza/go-dockerclient"
) )
@ -249,8 +250,8 @@ func NewFakeDockerCache(client DockerInterface) DockerCache {
} }
} }
func (f *FakeDockerCache) RunningContainers() (DockerContainers, error) { func (f *FakeDockerCache) GetPods() ([]*container.Pod, error) {
return GetKubeletDockerContainers(f.client, false) return GetPods(f.client, false)
} }
func (f *FakeDockerCache) ForceUpdateIfOlder(time.Time) error { func (f *FakeDockerCache) ForceUpdateIfOlder(time.Time) error {

View File

@ -39,6 +39,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/envvars" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/envvars"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics"
@ -745,7 +746,7 @@ func (kl *Kubelet) runContainer(pod *api.Pod, container *api.Container, podVolum
containerHostname = containerHostname[:hostnameMaxLen] containerHostname = containerHostname[:hostnameMaxLen]
} }
opts := docker.CreateContainerOptions{ opts := docker.CreateContainerOptions{
Name: dockertools.BuildDockerName(dockertools.KubeletContainerName{GetPodFullName(pod), pod.UID, container.Name}, container), Name: dockertools.BuildDockerName(dockertools.KubeletContainerName{kubecontainer.GetPodFullName(pod), pod.UID, container.Name}, container),
Config: &docker.Config{ Config: &docker.Config{
Cmd: container.Command, Cmd: container.Command,
Env: envVariables, Env: envVariables,
@ -822,7 +823,7 @@ func (kl *Kubelet) runContainer(pod *api.Pod, container *api.Container, podVolum
} }
if container.Lifecycle != nil && container.Lifecycle.PostStart != nil { if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
handlerErr := kl.runHandler(GetPodFullName(pod), pod.UID, container, container.Lifecycle.PostStart) handlerErr := kl.runHandler(kubecontainer.GetPodFullName(pod), pod.UID, container, container.Lifecycle.PostStart)
if handlerErr != nil { if handlerErr != nil {
kl.killContainerByID(dockerContainer.ID) kl.killContainerByID(dockerContainer.ID)
return dockertools.DockerID(""), fmt.Errorf("failed to call event handler: %v", handlerErr) return dockertools.DockerID(""), fmt.Errorf("failed to call event handler: %v", handlerErr)
@ -977,8 +978,8 @@ func parseResolvConf(reader io.Reader) (nameservers []string, searches []string,
} }
// Kill a docker container // Kill a docker container
func (kl *Kubelet) killContainer(dockerContainer *docker.APIContainers) error { func (kl *Kubelet) killContainer(c *kubecontainer.Container) error {
return kl.killContainerByID(dockerContainer.ID) return kl.killContainerByID(string(c.ID))
} }
func (kl *Kubelet) killContainerByID(ID string) error { func (kl *Kubelet) killContainerByID(ID string) error {
@ -1072,20 +1073,21 @@ func (kl *Kubelet) pullImage(img string, ref *api.ObjectReference) error {
} }
// Kill all containers in a pod. Returns the number of containers deleted and an error if one occurs. // Kill all containers in a pod. Returns the number of containers deleted and an error if one occurs.
func (kl *Kubelet) killContainersInPod(pod *api.Pod, dockerContainers dockertools.DockerContainers) (int, error) { func (kl *Kubelet) killContainersInPod(pod *api.Pod, runningPod kubecontainer.Pod) (int, error) {
podFullName := GetPodFullName(pod) podFullName := kubecontainer.GetPodFullName(pod)
count := 0 count := 0
errs := make(chan error, len(pod.Spec.Containers)) errs := make(chan error, len(pod.Spec.Containers))
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
for _, container := range pod.Spec.Containers { for _, container := range pod.Spec.Containers {
// TODO: Consider being more aggressive: kill all containers with this pod UID, period. // TODO: Consider being more aggressive: kill all containers with this pod UID, period.
if dockerContainer, found, _ := dockerContainers.FindPodContainer(podFullName, pod.UID, container.Name); found { c := runningPod.FindContainerByName(container.Name)
if c != nil {
count++ count++
wg.Add(1) wg.Add(1)
go func() { go func() {
defer util.HandleCrash() defer util.HandleCrash()
err := kl.killContainer(dockerContainer) err := kl.killContainer(c)
if err != nil { if err != nil {
glog.Errorf("Failed to delete container: %v; Skipping pod %q", err, podFullName) glog.Errorf("Failed to delete container: %v; Skipping pod %q", err, podFullName)
errs <- err errs <- err
@ -1124,7 +1126,7 @@ func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error {
} }
func (kl *Kubelet) shouldContainerBeRestarted(container *api.Container, pod *api.Pod) bool { func (kl *Kubelet) shouldContainerBeRestarted(container *api.Container, pod *api.Pod) bool {
podFullName := GetPodFullName(pod) podFullName := kubecontainer.GetPodFullName(pod)
// Check RestartPolicy for dead container // Check RestartPolicy for dead container
recentContainers, err := dockertools.GetRecentDockerContainersWithNameAndUUID(kl.dockerClient, podFullName, pod.UID, container.Name) recentContainers, err := dockertools.GetRecentDockerContainersWithNameAndUUID(kl.dockerClient, podFullName, pod.UID, container.Name)
if err != nil { if err != nil {
@ -1168,7 +1170,7 @@ func (kl *Kubelet) getPodInfraContainer(podFullName string, uid types.UID,
// if it was successful, and a non-nil error otherwise. // if it was successful, and a non-nil error otherwise.
func (kl *Kubelet) pullImageAndRunContainer(pod *api.Pod, container *api.Container, podVolumes *volumeMap, func (kl *Kubelet) pullImageAndRunContainer(pod *api.Pod, container *api.Container, podVolumes *volumeMap,
podInfraContainerID dockertools.DockerID) (dockertools.DockerID, error) { podInfraContainerID dockertools.DockerID) (dockertools.DockerID, error) {
podFullName := GetPodFullName(pod) podFullName := kubecontainer.GetPodFullName(pod)
ref, err := containerRef(pod, container) ref, err := containerRef(pod, container)
if err != nil { if err != nil {
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err) glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
@ -1217,8 +1219,8 @@ type podContainerChangesSpec struct {
containersToKeep map[dockertools.DockerID]int containersToKeep map[dockertools.DockerID]int
} }
func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, hasMirrorPod bool, containersInPod dockertools.DockerContainers) (podContainerChangesSpec, error) { func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, hasMirrorPod bool, runningPod kubecontainer.Pod) (podContainerChangesSpec, error) {
podFullName := GetPodFullName(pod) podFullName := kubecontainer.GetPodFullName(pod)
uid := pod.UID uid := pod.UID
glog.V(4).Infof("Syncing Pod %+v, podFullName: %q, uid: %q", pod, podFullName, uid) glog.V(4).Infof("Syncing Pod %+v, podFullName: %q, uid: %q", pod, podFullName, uid)
@ -1231,9 +1233,12 @@ func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, hasMirrorPod bool, c
containersToKeep := make(map[dockertools.DockerID]int) containersToKeep := make(map[dockertools.DockerID]int)
createPodInfraContainer := false createPodInfraContainer := false
var podStatus api.PodStatus var podStatus api.PodStatus
podInfraContainerID, found := kl.getPodInfraContainer(podFullName, uid, containersInPod)
if found { var podInfraContainerID dockertools.DockerID
podInfraContainer := runningPod.FindContainerByName(dockertools.PodInfraContainerName)
if podInfraContainer != nil {
glog.V(4).Infof("Found infra pod for %q", podFullName) glog.V(4).Infof("Found infra pod for %q", podFullName)
podInfraContainerID = dockertools.DockerID(podInfraContainer.ID)
containersToKeep[podInfraContainerID] = -1 containersToKeep[podInfraContainerID] = -1
podStatus, err = kl.GetPodStatus(podFullName) podStatus, err = kl.GetPodStatus(podFullName)
if err != nil { if err != nil {
@ -1246,15 +1251,19 @@ func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, hasMirrorPod bool, c
for index, container := range pod.Spec.Containers { for index, container := range pod.Spec.Containers {
expectedHash := dockertools.HashContainer(&container) expectedHash := dockertools.HashContainer(&container)
if dockerContainer, found, hash := containersInPod.FindPodContainer(podFullName, uid, container.Name); found {
containerID := dockertools.DockerID(dockerContainer.ID) c := runningPod.FindContainerByName(container.Name)
if c != nil {
containerID := dockertools.DockerID(c.ID)
hash := c.Hash
glog.V(3).Infof("pod %q container %q exists as %v", podFullName, container.Name, containerID) glog.V(3).Infof("pod %q container %q exists as %v", podFullName, container.Name, containerID)
if !createPodInfraContainer { if !createPodInfraContainer {
// look for changes in the container. // look for changes in the container.
containerChanged := hash != 0 && hash != expectedHash containerChanged := hash != 0 && hash != expectedHash
if !containerChanged { if !containerChanged {
result, err := kl.probeContainer(pod, podStatus, container, dockerContainer.ID, dockerContainer.Created) result, err := kl.probeContainer(pod, podStatus, container, string(c.ID), c.Created)
if err != nil { if err != nil {
// TODO(vmarmol): examine this logic. // TODO(vmarmol): examine this logic.
glog.V(2).Infof("probe no-error: %q", container.Name) glog.V(2).Infof("probe no-error: %q", container.Name)
@ -1320,8 +1329,8 @@ func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, hasMirrorPod bool, c
}, nil }, nil
} }
func (kl *Kubelet) syncPod(pod *api.Pod, hasMirrorPod bool, containersInPod dockertools.DockerContainers) error { func (kl *Kubelet) syncPod(pod *api.Pod, hasMirrorPod bool, runningPod kubecontainer.Pod) error {
podFullName := GetPodFullName(pod) podFullName := kubecontainer.GetPodFullName(pod)
uid := pod.UID uid := pod.UID
// Before returning, regenerate status and store it in the cache. // Before returning, regenerate status and store it in the cache.
@ -1334,7 +1343,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, hasMirrorPod bool, containersInPod dock
} }
}() }()
containerChanges, err := kl.computePodContainerChanges(pod, hasMirrorPod, containersInPod) containerChanges, err := kl.computePodContainerChanges(pod, hasMirrorPod, runningPod)
glog.V(3).Infof("Got container changes for pod %q: %+v", podFullName, containerChanges) glog.V(3).Infof("Got container changes for pod %q: %+v", podFullName, containerChanges)
if err != nil { if err != nil {
return err return err
@ -1346,20 +1355,23 @@ func (kl *Kubelet) syncPod(pod *api.Pod, hasMirrorPod bool, containersInPod dock
} else { } else {
glog.V(4).Infof("Killing Infra Container for %q, will start new one", podFullName) glog.V(4).Infof("Killing Infra Container for %q, will start new one", podFullName)
} }
// Killing phase: if we want to start new infra container, or nothing is running kill everything (including infra container) // Killing phase: if we want to start new infra container, or nothing is running kill everything (including infra container)
if podInfraContainer, found, _ := containersInPod.FindPodContainer(podFullName, uid, dockertools.PodInfraContainerName); found { // TODO(yifan): Replace with KillPod().
podInfraContainer := runningPod.FindContainerByName(dockertools.PodInfraContainerName)
if podInfraContainer != nil {
if err := kl.killContainer(podInfraContainer); err != nil { if err := kl.killContainer(podInfraContainer); err != nil {
glog.Warningf("Failed to kill pod infra container %q: %v", podInfraContainer.ID, err) glog.Warningf("Failed to kill pod infra container %q: %v", podInfraContainer.ID, err)
} }
} }
_, err = kl.killContainersInPod(pod, containersInPod) _, err = kl.killContainersInPod(pod, runningPod)
if err != nil { if err != nil {
return err return err
} }
} else { } else {
// Otherwise kill any containers in this pod which are not specified as ones to keep. // Otherwise kill any containers in this pod which are not specified as ones to keep.
for id, container := range containersInPod { for _, container := range runningPod.Containers {
_, keep := containerChanges.containersToKeep[id] _, keep := containerChanges.containersToKeep[dockertools.DockerID(container.ID)]
if !keep { if !keep {
glog.V(3).Infof("Killing unwanted container %+v", container) glog.V(3).Infof("Killing unwanted container %+v", container)
err = kl.killContainer(container) err = kl.killContainer(container)
@ -1499,7 +1511,7 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric
// Remove obsolete entries in podStatus where the pod is no longer considered bound to this node. // Remove obsolete entries in podStatus where the pod is no longer considered bound to this node.
podFullNames := make(map[string]bool) podFullNames := make(map[string]bool)
for _, pod := range allPods { for _, pod := range allPods {
podFullNames[GetPodFullName(&pod)] = true podFullNames[kubecontainer.GetPodFullName(&pod)] = true
} }
kl.statusManager.RemoveOrphanedStatuses(podFullNames) kl.statusManager.RemoveOrphanedStatuses(podFullNames)
@ -1507,7 +1519,7 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric
kl.handleNotFittingPods(allPods) kl.handleNotFittingPods(allPods)
var pods []api.Pod var pods []api.Pod
for _, pod := range allPods { for _, pod := range allPods {
status, ok := kl.statusManager.GetPodStatus(GetPodFullName(&pod)) status, ok := kl.statusManager.GetPodStatus(kubecontainer.GetPodFullName(&pod))
if ok && status.Phase == api.PodFailed { if ok && status.Phase == api.PodFailed {
continue continue
} }
@ -1516,28 +1528,21 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric
glog.V(4).Infof("Desired: %#v", pods) glog.V(4).Infof("Desired: %#v", pods)
var err error var err error
desiredContainers := make(map[dockertools.KubeletContainerName]empty)
desiredPods := make(map[types.UID]empty) desiredPods := make(map[types.UID]empty)
dockerContainers, err := kl.dockerCache.RunningContainers() runningPods, err := kl.dockerCache.GetPods()
if err != nil { if err != nil {
glog.Errorf("Error listing containers: %#v", dockerContainers) glog.Errorf("Error listing containers: %#v", err)
return err return err
} }
// Check for any containers that need starting // Check for any containers that need starting
for ix := range pods { for ix := range pods {
pod := &pods[ix] pod := &pods[ix]
podFullName := GetPodFullName(pod) podFullName := kubecontainer.GetPodFullName(pod)
uid := pod.UID uid := pod.UID
desiredPods[uid] = empty{} desiredPods[uid] = empty{}
// Add all containers (including net) to the map.
desiredContainers[dockertools.KubeletContainerName{podFullName, uid, dockertools.PodInfraContainerName}] = empty{}
for _, cont := range pod.Spec.Containers {
desiredContainers[dockertools.KubeletContainerName{podFullName, uid, cont.Name}] = empty{}
}
// Run the sync in an async manifest worker. // Run the sync in an async manifest worker.
_, hasMirrorPod := mirrorPods[podFullName] _, hasMirrorPod := mirrorPods[podFullName]
kl.podWorkers.UpdatePod(pod, hasMirrorPod, func() { kl.podWorkers.UpdatePod(pod, hasMirrorPod, func() {
@ -1561,31 +1566,27 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric
// Kill any containers we don't need. // Kill any containers we don't need.
killed := []string{} killed := []string{}
for ix := range dockerContainers { for _, pod := range runningPods {
// Don't kill containers that are in the desired pods. if _, found := desiredPods[pod.ID]; found {
dockerName, _, err := dockertools.ParseDockerName(dockerContainers[ix].Names[0])
_, found := desiredPods[dockerName.PodUID]
if err == nil && found {
// syncPod() will handle this one. // syncPod() will handle this one.
continue continue
} }
_, ok := desiredContainers[*dockerName] // Kill all the containers in the unidentified pod.
if err != nil || !ok { for _, c := range pod.Containers {
// call the networking plugin for teardown // call the networking plugin for teardown
if dockerName.ContainerName == dockertools.PodInfraContainerName { if c.Name == dockertools.PodInfraContainerName {
name, namespace, _ := ParsePodFullName(dockerName.PodFullName) err := kl.networkPlugin.TearDownPod(pod.Namespace, pod.Name, dockertools.DockerID(c.ID))
err := kl.networkPlugin.TearDownPod(namespace, name, dockertools.DockerID(dockerContainers[ix].ID))
if err != nil { if err != nil {
glog.Errorf("Network plugin pre-delete method returned an error: %v", err) glog.Errorf("Network plugin pre-delete method returned an error: %v", err)
} }
} }
glog.V(1).Infof("Killing unwanted container %+v", *dockerName) glog.V(1).Infof("Killing unwanted container %+v", c)
err = kl.killContainer(dockerContainers[ix]) err = kl.killContainer(c)
if err != nil { if err != nil {
glog.Errorf("Error killing container %+v: %v", *dockerName, err) glog.Errorf("Error killing container %+v: %v", c, err)
} else { } else {
killed = append(killed, dockerContainers[ix].ID) killed = append(killed, string(c.ID))
} }
} }
} }
@ -1639,7 +1640,7 @@ func checkHostPortConflicts(pods []api.Pod) (fitting []api.Pod, notFitting []api
for i := range pods { for i := range pods {
pod := &pods[i] pod := &pods[i]
if errs := validation.AccumulateUniquePorts(pod.Spec.Containers, ports, extract); len(errs) != 0 { if errs := validation.AccumulateUniquePorts(pod.Spec.Containers, ports, extract); len(errs) != 0 {
glog.Errorf("Pod %q: HostPort is already allocated, ignoring: %v", GetPodFullName(pod), errs) glog.Errorf("Pod %q: HostPort is already allocated, ignoring: %v", kubecontainer.GetPodFullName(pod), errs)
notFitting = append(notFitting, *pod) notFitting = append(notFitting, *pod)
continue continue
} }
@ -1686,21 +1687,21 @@ func (kl *Kubelet) handleNotFittingPods(pods []api.Pod) {
fitting, notFitting := checkHostPortConflicts(pods) fitting, notFitting := checkHostPortConflicts(pods)
for _, pod := range notFitting { for _, pod := range notFitting {
kl.recorder.Eventf(&pod, "hostPortConflict", "Cannot start the pod due to host port conflict.") kl.recorder.Eventf(&pod, "hostPortConflict", "Cannot start the pod due to host port conflict.")
kl.statusManager.SetPodStatus(GetPodFullName(&pod), api.PodStatus{ kl.statusManager.SetPodStatus(kubecontainer.GetPodFullName(&pod), api.PodStatus{
Phase: api.PodFailed, Phase: api.PodFailed,
Message: "Pod cannot be started due to host port conflict"}) Message: "Pod cannot be started due to host port conflict"})
} }
fitting, notFitting = kl.checkNodeSelectorMatching(fitting) fitting, notFitting = kl.checkNodeSelectorMatching(fitting)
for _, pod := range notFitting { for _, pod := range notFitting {
kl.recorder.Eventf(&pod, "nodeSelectorMismatching", "Cannot start the pod due to node selector mismatch.") kl.recorder.Eventf(&pod, "nodeSelectorMismatching", "Cannot start the pod due to node selector mismatch.")
kl.statusManager.SetPodStatus(GetPodFullName(&pod), api.PodStatus{ kl.statusManager.SetPodStatus(kubecontainer.GetPodFullName(&pod), api.PodStatus{
Phase: api.PodFailed, Phase: api.PodFailed,
Message: "Pod cannot be started due to node selector mismatch"}) Message: "Pod cannot be started due to node selector mismatch"})
} }
fitting, notFitting = kl.checkCapacityExceeded(fitting) fitting, notFitting = kl.checkCapacityExceeded(fitting)
for _, pod := range notFitting { for _, pod := range notFitting {
kl.recorder.Eventf(&pod, "capacityExceeded", "Cannot start the pod due to exceeded capacity.") kl.recorder.Eventf(&pod, "capacityExceeded", "Cannot start the pod due to exceeded capacity.")
kl.statusManager.SetPodStatus(GetPodFullName(&pod), api.PodStatus{ kl.statusManager.SetPodStatus(kubecontainer.GetPodFullName(&pod), api.PodStatus{
Phase: api.PodFailed, Phase: api.PodFailed,
Message: "Pod cannot be started due to exceeded capacity"}) Message: "Pod cannot be started due to exceeded capacity"})
} }
@ -1982,7 +1983,7 @@ func (kl *Kubelet) generatePodStatus(podFullName string) (api.PodStatus, error)
// By passing the pod directly, this method avoids pod lookup, which requires // By passing the pod directly, this method avoids pod lookup, which requires
// grabbing a lock. // grabbing a lock.
func (kl *Kubelet) generatePodStatusByPod(pod *api.Pod) (api.PodStatus, error) { func (kl *Kubelet) generatePodStatusByPod(pod *api.Pod) (api.PodStatus, error) {
podFullName := GetPodFullName(pod) podFullName := kubecontainer.GetPodFullName(pod)
glog.V(3).Infof("Generating status for %q", podFullName) glog.V(3).Infof("Generating status for %q", podFullName)
spec := &pod.Spec spec := &pod.Spec

View File

@ -38,6 +38,8 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network"
@ -87,8 +89,8 @@ func newTestKubelet(t *testing.T) *TestKubelet {
waitGroup := new(sync.WaitGroup) waitGroup := new(sync.WaitGroup)
kubelet.podWorkers = newPodWorkers( kubelet.podWorkers = newPodWorkers(
fakeDockerCache, fakeDockerCache,
func(pod *api.Pod, hasMirrorPod bool, containers dockertools.DockerContainers) error { func(pod *api.Pod, hasMirrorPod bool, runningPod container.Pod) error {
err := kubelet.syncPod(pod, hasMirrorPod, containers) err := kubelet.syncPod(pod, hasMirrorPod, runningPod)
waitGroup.Done() waitGroup.Done()
return err return err
}, },
@ -313,15 +315,49 @@ func TestKubeletDirsCompat(t *testing.T) {
} }
} }
func apiContainerToContainer(c docker.APIContainers) container.Container {
dockerName, hash, err := dockertools.ParseDockerName(c.Names[0])
if err != nil {
return container.Container{}
}
return container.Container{
ID: types.UID(c.ID),
Name: dockerName.ContainerName,
Hash: hash,
}
}
func dockerContainersToPod(containers dockertools.DockerContainers) container.Pod {
var pod container.Pod
for _, c := range containers {
dockerName, hash, err := dockertools.ParseDockerName(c.Names[0])
if err != nil {
continue
}
pod.Containers = append(pod.Containers, &container.Container{
ID: types.UID(c.ID),
Name: dockerName.ContainerName,
Hash: hash,
Image: c.Image,
})
// TODO(yifan): Only one evaluation is enough.
pod.ID = dockerName.PodUID
name, namespace, _ := kubecontainer.ParsePodFullName(dockerName.PodFullName)
pod.Name = name
pod.Namespace = namespace
}
return pod
}
func TestKillContainerWithError(t *testing.T) { func TestKillContainerWithError(t *testing.T) {
containers := []docker.APIContainers{ containers := []docker.APIContainers{
{ {
ID: "1234", ID: "1234",
Names: []string{"/k8s_foo_qux_1234_42"}, Names: []string{"/k8s_foo_qux_new_1234_42"},
}, },
{ {
ID: "5678", ID: "5678",
Names: []string{"/k8s_bar_qux_5678_42"}, Names: []string{"/k8s_bar_qux_new_5678_42"},
}, },
} }
fakeDocker := &dockertools.FakeDockerClient{ fakeDocker := &dockertools.FakeDockerClient{
@ -334,7 +370,8 @@ func TestKillContainerWithError(t *testing.T) {
kubelet.readiness.set(c.ID, true) kubelet.readiness.set(c.ID, true)
} }
kubelet.dockerClient = fakeDocker kubelet.dockerClient = fakeDocker
err := kubelet.killContainer(&fakeDocker.ContainerList[0]) c := apiContainerToContainer(fakeDocker.ContainerList[0])
err := kubelet.killContainer(&c)
if err == nil { if err == nil {
t.Errorf("expected error, found nil") t.Errorf("expected error, found nil")
} }
@ -353,11 +390,11 @@ func TestKillContainer(t *testing.T) {
containers := []docker.APIContainers{ containers := []docker.APIContainers{
{ {
ID: "1234", ID: "1234",
Names: []string{"/k8s_foo_qux_1234_42"}, Names: []string{"/k8s_foo_qux_new_1234_42"},
}, },
{ {
ID: "5678", ID: "5678",
Names: []string{"/k8s_bar_qux_5678_42"}, Names: []string{"/k8s_bar_qux_new_5678_42"},
}, },
} }
testKubelet := newTestKubelet(t) testKubelet := newTestKubelet(t)
@ -371,7 +408,8 @@ func TestKillContainer(t *testing.T) {
kubelet.readiness.set(c.ID, true) kubelet.readiness.set(c.ID, true)
} }
err := kubelet.killContainer(&fakeDocker.ContainerList[0]) c := apiContainerToContainer(fakeDocker.ContainerList[0])
err := kubelet.killContainer(&c)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -916,7 +954,7 @@ func TestSyncPodDeletesDuplicate(t *testing.T) {
} }
pods := []api.Pod{bound} pods := []api.Pod{bound}
kubelet.podManager.SetPods(pods) kubelet.podManager.SetPods(pods)
err := kubelet.syncPod(&bound, false, dockerContainers) err := kubelet.syncPod(&bound, false, dockerContainersToPod(dockerContainers))
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -958,7 +996,7 @@ func TestSyncPodBadHash(t *testing.T) {
} }
pods := []api.Pod{bound} pods := []api.Pod{bound}
kubelet.podManager.SetPods(pods) kubelet.podManager.SetPods(pods)
err := kubelet.syncPod(&bound, false, dockerContainers) err := kubelet.syncPod(&bound, false, dockerContainersToPod(dockerContainers))
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -1013,7 +1051,7 @@ func TestSyncPodUnhealthy(t *testing.T) {
} }
pods := []api.Pod{bound} pods := []api.Pod{bound}
kubelet.podManager.SetPods(pods) kubelet.podManager.SetPods(pods)
err := kubelet.syncPod(&bound, false, dockerContainers) err := kubelet.syncPod(&bound, false, dockerContainersToPod(dockerContainers))
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -1499,7 +1537,7 @@ func TestRunInContainerNoSuchPod(t *testing.T) {
podNamespace := "nsFoo" podNamespace := "nsFoo"
containerName := "containerFoo" containerName := "containerFoo"
output, err := kubelet.RunInContainer( output, err := kubelet.RunInContainer(
GetPodFullName(&api.Pod{ObjectMeta: api.ObjectMeta{Name: podName, Namespace: podNamespace}}), kubecontainer.GetPodFullName(&api.Pod{ObjectMeta: api.ObjectMeta{Name: podName, Namespace: podNamespace}}),
"", "",
containerName, containerName,
[]string{"ls"}) []string{"ls"})
@ -1532,7 +1570,7 @@ func TestRunInContainer(t *testing.T) {
cmd := []string{"ls"} cmd := []string{"ls"}
_, err := kubelet.RunInContainer( _, err := kubelet.RunInContainer(
GetPodFullName(&api.Pod{ kubecontainer.GetPodFullName(&api.Pod{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
UID: "12345678", UID: "12345678",
Name: podName, Name: podName,
@ -1704,7 +1742,7 @@ func TestSyncPodEventHandlerFails(t *testing.T) {
} }
pods := []api.Pod{bound} pods := []api.Pod{bound}
kubelet.podManager.SetPods(pods) kubelet.podManager.SetPods(pods)
err := kubelet.syncPod(&bound, false, dockerContainers) err := kubelet.syncPod(&bound, false, dockerContainersToPod(dockerContainers))
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -2566,7 +2604,7 @@ func TestExecInContainerNoSuchPod(t *testing.T) {
podNamespace := "nsFoo" podNamespace := "nsFoo"
containerName := "containerFoo" containerName := "containerFoo"
err := kubelet.ExecInContainer( err := kubelet.ExecInContainer(
GetPodFullName(&api.Pod{ObjectMeta: api.ObjectMeta{Name: podName, Namespace: podNamespace}}), kubecontainer.GetPodFullName(&api.Pod{ObjectMeta: api.ObjectMeta{Name: podName, Namespace: podNamespace}}),
"", "",
containerName, containerName,
[]string{"ls"}, []string{"ls"},
@ -2602,7 +2640,7 @@ func TestExecInContainerNoSuchContainer(t *testing.T) {
} }
err := kubelet.ExecInContainer( err := kubelet.ExecInContainer(
GetPodFullName(&api.Pod{ObjectMeta: api.ObjectMeta{ kubecontainer.GetPodFullName(&api.Pod{ObjectMeta: api.ObjectMeta{
UID: "12345678", UID: "12345678",
Name: podName, Name: podName,
Namespace: podNamespace, Namespace: podNamespace,
@ -2661,7 +2699,7 @@ func TestExecInContainer(t *testing.T) {
} }
err := kubelet.ExecInContainer( err := kubelet.ExecInContainer(
GetPodFullName(&api.Pod{ObjectMeta: api.ObjectMeta{ kubecontainer.GetPodFullName(&api.Pod{ObjectMeta: api.ObjectMeta{
UID: "12345678", UID: "12345678",
Name: podName, Name: podName,
Namespace: podNamespace, Namespace: podNamespace,
@ -2710,7 +2748,7 @@ func TestPortForwardNoSuchPod(t *testing.T) {
var port uint16 = 5000 var port uint16 = 5000
err := kubelet.PortForward( err := kubelet.PortForward(
GetPodFullName(&api.Pod{ObjectMeta: api.ObjectMeta{Name: podName, Namespace: podNamespace}}), kubecontainer.GetPodFullName(&api.Pod{ObjectMeta: api.ObjectMeta{Name: podName, Namespace: podNamespace}}),
"", "",
port, port,
nil, nil,
@ -2742,7 +2780,7 @@ func TestPortForwardNoSuchContainer(t *testing.T) {
} }
err := kubelet.PortForward( err := kubelet.PortForward(
GetPodFullName(&api.Pod{ObjectMeta: api.ObjectMeta{ kubecontainer.GetPodFullName(&api.Pod{ObjectMeta: api.ObjectMeta{
UID: "12345678", UID: "12345678",
Name: podName, Name: podName,
Namespace: podNamespace, Namespace: podNamespace,
@ -2787,7 +2825,7 @@ func TestPortForward(t *testing.T) {
} }
err := kubelet.PortForward( err := kubelet.PortForward(
GetPodFullName(&api.Pod{ObjectMeta: api.ObjectMeta{ kubecontainer.GetPodFullName(&api.Pod{ObjectMeta: api.ObjectMeta{
UID: "12345678", UID: "12345678",
Name: podName, Name: podName,
Namespace: podNamespace, Namespace: podNamespace,
@ -2863,7 +2901,7 @@ func TestHandlePortConflicts(t *testing.T) {
pods[1].CreationTimestamp = util.NewTime(time.Now()) pods[1].CreationTimestamp = util.NewTime(time.Now())
pods[0].CreationTimestamp = util.NewTime(time.Now().Add(1 * time.Second)) pods[0].CreationTimestamp = util.NewTime(time.Now().Add(1 * time.Second))
// The newer pod should be rejected. // The newer pod should be rejected.
conflictedPodName := GetPodFullName(&pods[0]) conflictedPodName := kubecontainer.GetPodFullName(&pods[0])
kl.handleNotFittingPods(pods) kl.handleNotFittingPods(pods)
// Check pod status stored in the status map. // Check pod status stored in the status map.
@ -2913,7 +2951,7 @@ func TestHandleNodeSelector(t *testing.T) {
}, },
} }
// The first pod should be rejected. // The first pod should be rejected.
notfittingPodName := GetPodFullName(&pods[0]) notfittingPodName := kubecontainer.GetPodFullName(&pods[0])
kl.handleNotFittingPods(pods) kl.handleNotFittingPods(pods)
// Check pod status stored in the status map. // Check pod status stored in the status map.
@ -2969,7 +3007,7 @@ func TestHandleMemExceeded(t *testing.T) {
pods[1].CreationTimestamp = util.NewTime(time.Now()) pods[1].CreationTimestamp = util.NewTime(time.Now())
pods[0].CreationTimestamp = util.NewTime(time.Now().Add(1 * time.Second)) pods[0].CreationTimestamp = util.NewTime(time.Now().Add(1 * time.Second))
// The newer pod should be rejected. // The newer pod should be rejected.
notfittingPodName := GetPodFullName(&pods[0]) notfittingPodName := kubecontainer.GetPodFullName(&pods[0])
kl.handleNotFittingPods(pods) kl.handleNotFittingPods(pods)
// Check pod status stored in the status map. // Check pod status stored in the status map.
@ -3004,12 +3042,12 @@ func TestPurgingObsoleteStatusMapEntries(t *testing.T) {
} }
// Run once to populate the status map. // Run once to populate the status map.
kl.handleNotFittingPods(pods) kl.handleNotFittingPods(pods)
if _, err := kl.GetPodStatus(BuildPodFullName("pod2", "")); err != nil { if _, err := kl.GetPodStatus(kubecontainer.BuildPodFullName("pod2", "")); err != nil {
t.Fatalf("expected to have status cached for %q: %v", "pod2", err) t.Fatalf("expected to have status cached for %q: %v", "pod2", err)
} }
// Sync with empty pods so that the entry in status map will be removed. // Sync with empty pods so that the entry in status map will be removed.
kl.SyncPods([]api.Pod{}, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) kl.SyncPods([]api.Pod{}, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
if _, err := kl.GetPodStatus(BuildPodFullName("pod2", "")); err == nil { if _, err := kl.GetPodStatus(kubecontainer.BuildPodFullName("pod2", "")); err == nil {
t.Fatalf("expected to not have status cached for %q: %v", "pod2", err) t.Fatalf("expected to not have status cached for %q: %v", "pod2", err)
} }
} }
@ -3250,11 +3288,11 @@ func TestCreateMirrorPod(t *testing.T) {
pods := []api.Pod{pod} pods := []api.Pod{pod}
kl.podManager.SetPods(pods) kl.podManager.SetPods(pods)
hasMirrorPod := false hasMirrorPod := false
err := kl.syncPod(&pod, hasMirrorPod, dockertools.DockerContainers{}) err := kl.syncPod(&pod, hasMirrorPod, container.Pod{})
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
podFullName := GetPodFullName(&pod) podFullName := kubecontainer.GetPodFullName(&pod)
if !manager.HasPod(podFullName) { if !manager.HasPod(podFullName) {
t.Errorf("expected mirror pod %q to be created", podFullName) t.Errorf("expected mirror pod %q to be created", podFullName)
} }
@ -3304,7 +3342,7 @@ func TestDeleteOrphanedMirrorPods(t *testing.T) {
t.Errorf("expected zero mirror pods, got %v", manager.GetPods()) t.Errorf("expected zero mirror pods, got %v", manager.GetPods())
} }
for _, pod := range orphanPods { for _, pod := range orphanPods {
name := GetPodFullName(&pod) name := kubecontainer.GetPodFullName(&pod)
creates, deletes := manager.GetCounts(name) creates, deletes := manager.GetCounts(name)
if creates != 0 || deletes != 1 { if creates != 0 || deletes != 1 {
t.Errorf("expected 0 creation and one deletion of %q, got %d, %d", name, creates, deletes) t.Errorf("expected 0 creation and one deletion of %q, got %d, %d", name, creates, deletes)

View File

@ -21,7 +21,6 @@ import (
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/golang/glog" "github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
@ -139,22 +138,16 @@ func (self *podAndContainerCollector) Describe(ch chan<- *prometheus.Desc) {
} }
func (self *podAndContainerCollector) Collect(ch chan<- prometheus.Metric) { func (self *podAndContainerCollector) Collect(ch chan<- prometheus.Metric) {
runningContainers, err := self.containerCache.RunningContainers() runningPods, err := self.containerCache.GetPods()
if err != nil { if err != nil {
glog.Warning("Failed to get running container information while collecting metrics: %v", err) glog.Warning("Failed to get running container information while collecting metrics: %v", err)
return return
} }
// Get a set of running pods. runningContainers := 0
runningPods := make(map[types.UID]struct{}) for _, p := range runningPods {
for _, cont := range runningContainers { runningContainers += len(p.Containers)
containerName, _, err := dockertools.ParseDockerName(cont.Names[0])
if err != nil {
continue
} }
runningPods[containerName.PodUID] = struct{}{}
}
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
runningPodCountDesc, runningPodCountDesc,
prometheus.GaugeValue, prometheus.GaugeValue,
@ -162,5 +155,5 @@ func (self *podAndContainerCollector) Collect(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
runningContainerCountDesc, runningContainerCountDesc,
prometheus.GaugeValue, prometheus.GaugeValue,
float64(len(runningContainers))) float64(runningContainers))
} }

View File

@ -21,6 +21,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/golang/glog" "github.com/golang/glog"
) )
@ -59,7 +60,7 @@ func (self *basicMirrorClient) DeleteMirrorPod(podFullName string) error {
if self.apiserverClient == nil { if self.apiserverClient == nil {
return nil return nil
} }
name, namespace, err := ParsePodFullName(podFullName) name, namespace, err := kubecontainer.ParsePodFullName(podFullName)
if err != nil { if err != nil {
glog.Errorf("Failed to parse a pod full name %q", podFullName) glog.Errorf("Failed to parse a pod full name %q", podFullName)
return err return err

View File

@ -21,6 +21,7 @@ import (
"testing" "testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
) )
@ -36,7 +37,7 @@ type fakeMirrorClient struct {
func (self *fakeMirrorClient) CreateMirrorPod(pod api.Pod, _ string) error { func (self *fakeMirrorClient) CreateMirrorPod(pod api.Pod, _ string) error {
self.mirrorPodLock.Lock() self.mirrorPodLock.Lock()
defer self.mirrorPodLock.Unlock() defer self.mirrorPodLock.Unlock()
podFullName := GetPodFullName(&pod) podFullName := kubecontainer.GetPodFullName(&pod)
self.mirrorPods.Insert(podFullName) self.mirrorPods.Insert(podFullName)
self.createCounts[podFullName]++ self.createCounts[podFullName]++
return nil return nil
@ -95,7 +96,7 @@ func TestParsePodFullName(t *testing.T) {
failedCases := []string{"barfoo", "bar_foo_foo", ""} failedCases := []string{"barfoo", "bar_foo_foo", ""}
for podFullName, expected := range successfulCases { for podFullName, expected := range successfulCases {
name, namespace, err := ParsePodFullName(podFullName) name, namespace, err := kubecontainer.ParsePodFullName(podFullName)
if err != nil { if err != nil {
t.Errorf("unexpected error when parsing the full name: %v", err) t.Errorf("unexpected error when parsing the full name: %v", err)
continue continue
@ -106,7 +107,7 @@ func TestParsePodFullName(t *testing.T) {
} }
} }
for _, podFullName := range failedCases { for _, podFullName := range failedCases {
_, _, err := ParsePodFullName(podFullName) _, _, err := kubecontainer.ParsePodFullName(podFullName)
if err == nil { if err == nil {
t.Errorf("expected error when parsing the full name, got none") t.Errorf("expected error when parsing the full name, got none")
} }

View File

@ -21,6 +21,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/golang/glog" "github.com/golang/glog"
@ -139,7 +140,7 @@ func (self *basicPodManager) setPods(newPods []api.Pod) {
for i := range newPods { for i := range newPods {
pod := newPods[i] pod := newPods[i]
podFullName := GetPodFullName(&pod) podFullName := kubecontainer.GetPodFullName(&pod)
if isMirrorPod(&pod) { if isMirrorPod(&pod) {
mirrorPodByUID[pod.UID] = &pod mirrorPodByUID[pod.UID] = &pod
mirrorPodByFullName[podFullName] = &pod mirrorPodByFullName[podFullName] = &pod
@ -207,7 +208,7 @@ func (self *basicPodManager) GetPodsAndMirrorMap() ([]api.Pod, map[string]*api.P
// GetPodByName provides the (non-mirror) pod that matches namespace and name, // GetPodByName provides the (non-mirror) pod that matches namespace and name,
// as well as whether the pod was found. // as well as whether the pod was found.
func (self *basicPodManager) GetPodByName(namespace, name string) (*api.Pod, bool) { func (self *basicPodManager) GetPodByName(namespace, name string) (*api.Pod, bool) {
podFullName := BuildPodFullName(name, namespace) podFullName := kubecontainer.BuildPodFullName(name, namespace)
return self.GetPodByFullName(podFullName) return self.GetPodByFullName(podFullName)
} }
@ -234,7 +235,7 @@ func (self *basicPodManager) TranslatePodUID(uid types.UID) types.UID {
self.lock.RLock() self.lock.RLock()
defer self.lock.RUnlock() defer self.lock.RUnlock()
if mirrorPod, ok := self.mirrorPodByUID[uid]; ok { if mirrorPod, ok := self.mirrorPodByUID[uid]; ok {
podFullName := GetPodFullName(mirrorPod) podFullName := kubecontainer.GetPodFullName(mirrorPod)
if pod, ok := self.podByFullName[podFullName]; ok { if pod, ok := self.podByFullName[podFullName]; ok {
return pod.UID return pod.UID
} }

View File

@ -21,6 +21,7 @@ import (
"testing" "testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
) )
// Stub out mirror client for testing purpose. // Stub out mirror client for testing purpose.
@ -78,9 +79,9 @@ func TestGetSetPods(t *testing.T) {
} else if !reflect.DeepEqual(&mirrorPod, actualPod) { } else if !reflect.DeepEqual(&mirrorPod, actualPod) {
t.Errorf("mirror pod is recorded incorrectly. expect: %v, got: %v", mirrorPod, actualPod) t.Errorf("mirror pod is recorded incorrectly. expect: %v, got: %v", mirrorPod, actualPod)
} }
actualPod, ok = podManager.mirrorPodByFullName[GetPodFullName(&mirrorPod)] actualPod, ok = podManager.mirrorPodByFullName[kubecontainer.GetPodFullName(&mirrorPod)]
if !ok { if !ok {
t.Errorf("mirror pod %q is not found in the mirror pod map by full name", GetPodFullName(&mirrorPod)) t.Errorf("mirror pod %q is not found in the mirror pod map by full name", kubecontainer.GetPodFullName(&mirrorPod))
} else if !reflect.DeepEqual(&mirrorPod, actualPod) { } else if !reflect.DeepEqual(&mirrorPod, actualPod) {
t.Errorf("mirror pod is recorded incorrectly. expect: %v, got: %v", mirrorPod, actualPod) t.Errorf("mirror pod is recorded incorrectly. expect: %v, got: %v", mirrorPod, actualPod)
} }

View File

@ -22,13 +22,14 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog" "github.com/golang/glog"
) )
type syncPodFnType func(*api.Pod, bool, dockertools.DockerContainers) error type syncPodFnType func(*api.Pod, bool, container.Pod) error
type podWorkers struct { type podWorkers struct {
// Protects all per worker fields. // Protects all per worker fields.
@ -90,14 +91,14 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) {
glog.Errorf("Error updating docker cache: %v", err) glog.Errorf("Error updating docker cache: %v", err)
return return
} }
containers, err := p.dockerCache.RunningContainers() pods, err := p.dockerCache.GetPods()
if err != nil { if err != nil {
glog.Errorf("Error listing containers while syncing pod: %v", err) glog.Errorf("Error getting pods while syncing pod: %v", err)
return return
} }
err = p.syncPodFn(newWork.pod, newWork.hasMirrorPod, err = p.syncPodFn(newWork.pod, newWork.hasMirrorPod,
containers.FindContainersByPod(newWork.pod.UID, GetPodFullName(newWork.pod))) container.Pods(pods).FindPodByID(newWork.pod.UID))
if err != nil { if err != nil {
glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err) glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err)
p.recorder.Eventf(newWork.pod, "failedSync", "Error syncing pod, skipping: %v", err) p.recorder.Eventf(newWork.pod, "failedSync", "Error syncing pod, skipping: %v", err)

View File

@ -23,6 +23,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/types"
) )
@ -46,7 +47,7 @@ func createPodWorkers() (*podWorkers, map[types.UID][]string) {
podWorkers := newPodWorkers( podWorkers := newPodWorkers(
fakeDockerCache, fakeDockerCache,
func(pod *api.Pod, hasMirrorPod bool, containers dockertools.DockerContainers) error { func(pod *api.Pod, hasMirrorPod bool, runningPod container.Pod) error {
func() { func() {
lock.Lock() lock.Lock()
defer lock.Unlock() defer lock.Unlock()

View File

@ -24,6 +24,7 @@ import (
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/probe" "github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
execprobe "github.com/GoogleCloudPlatform/kubernetes/pkg/probe/exec" execprobe "github.com/GoogleCloudPlatform/kubernetes/pkg/probe/exec"
httprobe "github.com/GoogleCloudPlatform/kubernetes/pkg/probe/http" httprobe "github.com/GoogleCloudPlatform/kubernetes/pkg/probe/http"
@ -189,7 +190,7 @@ type execInContainer struct {
func (kl *Kubelet) newExecInContainer(pod *api.Pod, container api.Container) exec.Cmd { func (kl *Kubelet) newExecInContainer(pod *api.Pod, container api.Container) exec.Cmd {
uid := pod.UID uid := pod.UID
podFullName := GetPodFullName(pod) podFullName := kubecontainer.GetPodFullName(pod)
return execInContainer{func() ([]byte, error) { return execInContainer{func() ([]byte, error) {
return kl.RunInContainer(podFullName, uid, container.Name, container.LivenessProbe.Exec.Command) return kl.RunInContainer(podFullName, uid, container.Name, container.LivenessProbe.Exec.Command)
}} }}

View File

@ -21,6 +21,7 @@ import (
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/golang/glog" "github.com/golang/glog"
) )
@ -91,11 +92,12 @@ func (kl *Kubelet) runPod(pod api.Pod, retryDelay time.Duration) error {
delay := retryDelay delay := retryDelay
retry := 0 retry := 0
for { for {
dockerContainers, err := dockertools.GetKubeletDockerContainers(kl.dockerClient, false) pods, err := dockertools.GetPods(kl.dockerClient, false)
if err != nil { if err != nil {
return fmt.Errorf("failed to get kubelet docker containers: %v", err) return fmt.Errorf("failed to get kubelet pods: %v", err)
} }
running, err := kl.isPodRunning(pod, dockerContainers) p := container.Pods(pods).FindPodByID(pod.UID)
running, err := kl.isPodRunning(pod, p)
if err != nil { if err != nil {
return fmt.Errorf("failed to check pod status: %v", err) return fmt.Errorf("failed to check pod status: %v", err)
} }
@ -106,7 +108,7 @@ func (kl *Kubelet) runPod(pod api.Pod, retryDelay time.Duration) error {
glog.Infof("pod %q containers not running: syncing", pod.Name) glog.Infof("pod %q containers not running: syncing", pod.Name)
// We don't create mirror pods in this mode; pass a dummy boolean value // We don't create mirror pods in this mode; pass a dummy boolean value
// to sycnPod. // to sycnPod.
if err = kl.syncPod(&pod, false, dockerContainers); err != nil { if err = kl.syncPod(&pod, false, p); err != nil {
return fmt.Errorf("error syncing pod: %v", err) return fmt.Errorf("error syncing pod: %v", err)
} }
if retry >= RunOnceMaxRetries { if retry >= RunOnceMaxRetries {
@ -121,14 +123,14 @@ func (kl *Kubelet) runPod(pod api.Pod, retryDelay time.Duration) error {
} }
// isPodRunning returns true if all containers of a manifest are running. // isPodRunning returns true if all containers of a manifest are running.
func (kl *Kubelet) isPodRunning(pod api.Pod, dockerContainers dockertools.DockerContainers) (bool, error) { func (kl *Kubelet) isPodRunning(pod api.Pod, runningPod container.Pod) (bool, error) {
for _, container := range pod.Spec.Containers { for _, container := range pod.Spec.Containers {
dockerContainer, found, _ := dockerContainers.FindPodContainer(GetPodFullName(&pod), pod.UID, container.Name) c := runningPod.FindContainerByName(container.Name)
if !found { if c == nil {
glog.Infof("container %q not found", container.Name) glog.Infof("container %q not found", container.Name)
return false, nil return false, nil
} }
inspectResult, err := kl.dockerClient.InspectContainer(dockerContainer.ID) inspectResult, err := kl.dockerClient.InspectContainer(string(c.ID))
if err != nil { if err != nil {
glog.Infof("failed to inspect container %q: %v", container.Name, err) glog.Infof("failed to inspect container %q: %v", container.Name, err)
return false, err return false, err

View File

@ -35,6 +35,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/healthz" "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog" "github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream"
@ -251,7 +252,7 @@ func (s *Server) handleContainerLogs(w http.ResponseWriter, req *http.Request) {
} }
w.Header().Set("Transfer-Encoding", "chunked") w.Header().Set("Transfer-Encoding", "chunked")
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
err = s.host.GetKubeletContainerLogs(GetPodFullName(pod), containerName, tail, follow, &fw, &fw) err = s.host.GetKubeletContainerLogs(kubecontainer.GetPodFullName(pod), containerName, tail, follow, &fw, &fw)
if err != nil { if err != nil {
s.error(w, err) s.error(w, err)
return return
@ -303,7 +304,7 @@ func (s *Server) handlePodStatus(w http.ResponseWriter, req *http.Request, versi
http.Error(w, "Pod does not exist", http.StatusNotFound) http.Error(w, "Pod does not exist", http.StatusNotFound)
return return
} }
status, err := s.host.GetPodStatus(GetPodFullName(pod)) status, err := s.host.GetPodStatus(kubecontainer.GetPodFullName(pod))
if err != nil { if err != nil {
s.error(w, err) s.error(w, err)
return return
@ -407,7 +408,7 @@ func (s *Server) handleRun(w http.ResponseWriter, req *http.Request) {
return return
} }
command := strings.Split(u.Query().Get("cmd"), " ") command := strings.Split(u.Query().Get("cmd"), " ")
data, err := s.host.RunInContainer(GetPodFullName(pod), uid, container, command) data, err := s.host.RunInContainer(kubecontainer.GetPodFullName(pod), uid, container, command)
if err != nil { if err != nil {
s.error(w, err) s.error(w, err)
return return
@ -515,7 +516,7 @@ WaitForStreams:
stdinStream.Close() stdinStream.Close()
} }
err = s.host.ExecInContainer(GetPodFullName(pod), uid, container, u.Query()[api.ExecCommandParamm], stdinStream, stdoutStream, stderrStream, tty) err = s.host.ExecInContainer(kubecontainer.GetPodFullName(pod), uid, container, u.Query()[api.ExecCommandParamm], stdinStream, stdoutStream, stderrStream, tty)
if err != nil { if err != nil {
msg := fmt.Sprintf("Error executing command in container: %v", err) msg := fmt.Sprintf("Error executing command in container: %v", err)
glog.Error(msg) glog.Error(msg)
@ -596,7 +597,7 @@ Loop:
case "error": case "error":
ch := make(chan httpstream.Stream) ch := make(chan httpstream.Stream)
dataStreamChans[port] = ch dataStreamChans[port] = ch
go waitForPortForwardDataStreamAndRun(GetPodFullName(pod), uid, stream, ch, s.host) go waitForPortForwardDataStreamAndRun(kubecontainer.GetPodFullName(pod), uid, stream, ch, s.host)
case "data": case "data":
ch, ok := dataStreamChans[port] ch, ok := dataStreamChans[port]
if ok { if ok {
@ -678,14 +679,14 @@ func (s *Server) serveStats(w http.ResponseWriter, req *http.Request) {
http.Error(w, "Pod does not exist", http.StatusNotFound) http.Error(w, "Pod does not exist", http.StatusNotFound)
return return
} }
stats, err = s.host.GetContainerInfo(GetPodFullName(pod), "", components[2], &query) stats, err = s.host.GetContainerInfo(kubecontainer.GetPodFullName(pod), "", components[2], &query)
case 5: case 5:
pod, ok := s.host.GetPodByName(components[1], components[2]) pod, ok := s.host.GetPodByName(components[1], components[2])
if !ok { if !ok {
http.Error(w, "Pod does not exist", http.StatusNotFound) http.Error(w, "Pod does not exist", http.StatusNotFound)
return return
} }
stats, err = s.host.GetContainerInfo(GetPodFullName(pod), types.UID(components[3]), components[4], &query) stats, err = s.host.GetContainerInfo(kubecontainer.GetPodFullName(pod), types.UID(components[3]), components[4], &query)
default: default:
http.Error(w, "unknown resource.", http.StatusNotFound) http.Error(w, "unknown resource.", http.StatusNotFound)
return return

View File

@ -23,6 +23,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog" "github.com/golang/glog"
) )
@ -101,7 +102,7 @@ func (s *statusManager) SyncBatch() {
podFullName := syncRequest.podFullName podFullName := syncRequest.podFullName
status := syncRequest.status status := syncRequest.status
glog.V(3).Infof("Syncing status for %s", podFullName) glog.V(3).Infof("Syncing status for %s", podFullName)
name, namespace, err := ParsePodFullName(podFullName) name, namespace, err := kubecontainer.ParsePodFullName(podFullName)
if err != nil { if err != nil {
glog.Warningf("Cannot parse pod full name %q: %s", podFullName, err) glog.Warningf("Cannot parse pod full name %q: %s", podFullName, err)
} }

View File

@ -16,12 +16,7 @@ limitations under the License.
package kubelet package kubelet
import ( import "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"fmt"
"strings"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
const ConfigSourceAnnotationKey = "kubernetes.io/config.source" const ConfigSourceAnnotationKey = "kubernetes.io/config.source"
const ConfigMirrorAnnotationKey = "kubernetes.io/config.mirror" const ConfigMirrorAnnotationKey = "kubernetes.io/config.mirror"
@ -71,24 +66,3 @@ type PodUpdate struct {
Op PodOperation Op PodOperation
Source string Source string
} }
// GetPodFullName returns a name that uniquely identifies a pod.
func GetPodFullName(pod *api.Pod) string {
// Use underscore as the delimiter because it is not allowed in pod name
// (DNS subdomain format), while allowed in the container name format.
return fmt.Sprintf("%s_%s", pod.Name, pod.Namespace)
}
// Build the pod full name from pod name and namespace.
func BuildPodFullName(name, namespace string) string {
return name + "_" + namespace
}
// Parse the pod full name.
func ParsePodFullName(podFullName string) (string, string, error) {
parts := strings.Split(podFullName, "_")
if len(parts) != 2 {
return "", "", fmt.Errorf("failed to parse the pod full name %q", podFullName)
}
return parts[0], parts[1], nil
}