diff --git a/pkg/kubelet/dockertools/docker.go b/pkg/kubelet/dockertools/docker.go index 0e1f1439a49..801febf2f83 100644 --- a/pkg/kubelet/dockertools/docker.go +++ b/pkg/kubelet/dockertools/docker.go @@ -32,6 +32,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/leaky" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -816,3 +817,61 @@ type ContainerCommandRunner interface { ExecInContainer(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error PortForward(podInfraContainerID string, port uint16, stream io.ReadWriteCloser) error } + +// Parse the pod full name. TODO(yifan): This is duplicated with kubelet.ParsePodFullName. +func parsePodFullName(podFullName string) (string, string, error) { + parts := strings.Split(podFullName, "_") + if len(parts) != 2 { + return "", "", fmt.Errorf("failed to parse the pod full name %q", podFullName) + } + return parts[0], parts[1], nil +} + +func GetPods(client DockerInterface, all bool) ([]*container.Pod, error) { + pods := make(map[types.UID]*container.Pod) + var result []*container.Pod + + containers, err := GetKubeletDockerContainers(client, all) + if err != nil { + return nil, err + } + + // Group containers by pod. + for _, c := range containers { + if len(c.Names) == 0 { + glog.Warningf("Cannog parse empty docker container name: %#v", c.Names) + continue + } + podFullName, podUID, containerName, hash, err := ParseDockerName(c.Names[0]) + if err != nil { + glog.Warningf("Parse docker container name %q error: %v", c.Names[0], err) + continue + } + pod, found := pods[podUID] + if !found { + name, namespace, err := parsePodFullName(podFullName) + if err != nil { + glog.Warningf("Parse pod full name %q error: %v", podFullName, err) + continue + } + pod = &container.Pod{ + ID: podUID, + Name: name, + Namespace: namespace, + } + pods[podUID] = pod + } + pod.Containers = append(pod.Containers, &container.Container{ + ID: types.UID(c.ID), + Name: containerName, + Hash: hash, + Created: c.Created, + }) + } + + // Convert map to list. + for _, c := range pods { + result = append(result, c) + } + return result, nil +} diff --git a/pkg/kubelet/dockertools/docker_cache.go b/pkg/kubelet/dockertools/docker_cache.go index ed22fcb0b1c..d1fe0d6d4c4 100644 --- a/pkg/kubelet/dockertools/docker_cache.go +++ b/pkg/kubelet/dockertools/docker_cache.go @@ -19,10 +19,12 @@ package dockertools import ( "sync" "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" ) type DockerCache interface { - RunningContainers() (DockerContainers, error) + GetPods() ([]*container.Pod, error) ForceUpdateIfOlder(time.Time) error } @@ -43,7 +45,7 @@ type dockerCache struct { // Last time when cache was updated. cacheTime time.Time // The content of the cache. - containers DockerContainers + pods []*container.Pod // Whether the background thread updating the cache is running. updatingCache bool // Time when the background thread should be stopped. @@ -53,15 +55,15 @@ type dockerCache struct { // Ensure that dockerCache abides by the DockerCache interface. var _ DockerCache = new(dockerCache) -func (d *dockerCache) RunningContainers() (DockerContainers, error) { +func (d *dockerCache) GetPods() ([]*container.Pod, error) { d.lock.Lock() defer d.lock.Unlock() if time.Since(d.cacheTime) > 2*time.Second { - containers, err := GetKubeletDockerContainers(d.client, false) + pods, err := GetPods(d.client, false) if err != nil { - return containers, err + return pods, err } - d.containers = containers + d.pods = pods d.cacheTime = time.Now() } // Stop refreshing thread if there were no requests within last 2 seconds. @@ -70,18 +72,18 @@ func (d *dockerCache) RunningContainers() (DockerContainers, error) { d.updatingCache = true go d.startUpdatingCache() } - return d.containers, nil + return d.pods, nil } func (d *dockerCache) ForceUpdateIfOlder(minExpectedCacheTime time.Time) error { d.lock.Lock() defer d.lock.Unlock() if d.cacheTime.Before(minExpectedCacheTime) { - containers, err := GetKubeletDockerContainers(d.client, false) + pods, err := GetPods(d.client, false) if err != nil { return err } - d.containers = containers + d.pods = pods d.cacheTime = time.Now() } return nil @@ -91,7 +93,7 @@ func (d *dockerCache) startUpdatingCache() { run := true for run { time.Sleep(100 * time.Millisecond) - containers, err := GetKubeletDockerContainers(d.client, false) + pods, err := GetPods(d.client, false) cacheTime := time.Now() if err != nil { continue @@ -102,7 +104,7 @@ func (d *dockerCache) startUpdatingCache() { d.updatingCache = false run = false } - d.containers = containers + d.pods = pods d.cacheTime = cacheTime d.lock.Unlock() } diff --git a/pkg/kubelet/dockertools/fake_docker_client.go b/pkg/kubelet/dockertools/fake_docker_client.go index 1de714d9c45..ceb851ca4a1 100644 --- a/pkg/kubelet/dockertools/fake_docker_client.go +++ b/pkg/kubelet/dockertools/fake_docker_client.go @@ -22,6 +22,7 @@ import ( "sync" "time" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/fsouza/go-dockerclient" ) @@ -249,8 +250,8 @@ func NewFakeDockerCache(client DockerInterface) DockerCache { } } -func (f *FakeDockerCache) RunningContainers() (DockerContainers, error) { - return GetKubeletDockerContainers(f.client, false) +func (f *FakeDockerCache) GetPods() ([]*container.Pod, error) { + return GetPods(f.client, false) } func (f *FakeDockerCache) ForceUpdateIfOlder(time.Time) error {