Merge pull request #827 from brendandburns/kubelet

Add container hashing to the container name, and restart containers on changes.
This commit is contained in:
brendandburns 2014-08-08 14:17:11 -07:00
commit 4c00acdde0
3 changed files with 132 additions and 43 deletions

View File

@ -19,12 +19,15 @@ package kubelet
import (
"errors"
"fmt"
"hash/adler32"
"math/rand"
"os/exec"
"strconv"
"strings"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/fsouza/go-dockerclient"
"github.com/golang/glog"
)
// DockerContainerData is the structured representation of the JSON object returned by Docker inspect
@ -106,21 +109,21 @@ func (p dockerPuller) Pull(image string) error {
// DockerContainers is a map of containers
type DockerContainers map[DockerID]*docker.APIContainers
func (c DockerContainers) FindPodContainer(podFullName, containerName string) (*docker.APIContainers, bool) {
func (c DockerContainers) FindPodContainer(podFullName, containerName string) (*docker.APIContainers, bool, uint64) {
for _, dockerContainer := range c {
dockerManifestID, dockerContainerName := parseDockerName(dockerContainer.Names[0])
dockerManifestID, dockerContainerName, hash := parseDockerName(dockerContainer.Names[0])
if dockerManifestID == podFullName && dockerContainerName == containerName {
return dockerContainer, true
return dockerContainer, true, hash
}
}
return nil, false
return nil, false, 0
}
func (c DockerContainers) FindContainersByPodFullName(podFullName string) map[string]*docker.APIContainers {
containers := make(map[string]*docker.APIContainers)
for _, dockerContainer := range c {
dockerManifestID, dockerContainerName := parseDockerName(dockerContainer.Names[0])
dockerManifestID, dockerContainerName, _ := parseDockerName(dockerContainer.Names[0])
if dockerManifestID == podFullName {
containers[dockerContainerName] = dockerContainer
}
@ -160,7 +163,7 @@ func getDockerPodInfo(client DockerInterface, podFullName string) (api.PodInfo,
}
for _, value := range containers {
dockerManifestID, dockerContainerName := parseDockerName(value.Names[0])
dockerManifestID, dockerContainerName, _ := parseDockerName(value.Names[0])
if dockerManifestID != podFullName {
continue
}
@ -198,15 +201,22 @@ func unescapeDash(in string) (out string) {
const containerNamePrefix = "k8s"
func hashContainer(container *api.Container) uint64 {
hash := adler32.New()
fmt.Fprintf(hash, "%#v", *container)
return uint64(hash.Sum32())
}
// Creates a name which can be reversed to identify both full pod name and container name.
func buildDockerName(pod *Pod, container *api.Container) string {
containerName := escapeDash(container.Name) + "." + strconv.FormatUint(hashContainer(container), 16)
// Note, manifest.ID could be blank.
return fmt.Sprintf("%s--%s--%s--%08x", containerNamePrefix, escapeDash(container.Name), escapeDash(GetPodFullName(pod)), rand.Uint32())
return fmt.Sprintf("%s--%s--%s--%08x", containerNamePrefix, containerName, escapeDash(GetPodFullName(pod)), rand.Uint32())
}
// Upacks a container name, returning the pod full name and container name we would have used to
// construct the docker name. If the docker name isn't one we created, we may return empty strings.
func parseDockerName(name string) (podFullName, containerName string) {
func parseDockerName(name string) (podFullName, containerName string, hash uint64) {
// For some reason docker appears to be appending '/' to names.
// If it's there, strip it.
if name[0] == '/' {
@ -217,7 +227,15 @@ func parseDockerName(name string) (podFullName, containerName string) {
return
}
if len(parts) > 1 {
containerName = unescapeDash(parts[1])
pieces := strings.Split(parts[1], ".")
containerName = unescapeDash(pieces[0])
if len(pieces) > 1 {
var err error
hash, err = strconv.ParseUint(pieces[1], 16, 32)
if err != nil {
glog.Infof("invalid container hash: %s", pieces[1])
}
}
}
if len(parts) > 2 {
podFullName = unescapeDash(parts[2])

View File

@ -322,7 +322,7 @@ func (kl *Kubelet) runContainer(pod *Pod, container *api.Container, podVolumes v
func (kl *Kubelet) killContainer(dockerContainer *docker.APIContainers) error {
glog.Infof("Killing: %s", dockerContainer.ID)
err := kl.dockerClient.StopContainer(dockerContainer.ID, 10)
podFullName, containerName := parseDockerName(dockerContainer.Names[0])
podFullName, containerName, _ := parseDockerName(dockerContainer.Names[0])
kl.LogEvent(&api.Event{
Event: "STOP",
Manifest: &api.ContainerManifest{
@ -366,7 +366,7 @@ func (kl *Kubelet) deleteAllContainers(pod *Pod, podFullName string, dockerConta
errs := make(chan error, len(pod.Manifest.Containers))
wg := sync.WaitGroup{}
for _, container := range pod.Manifest.Containers {
if dockerContainer, found := dockerContainers.FindPodContainer(podFullName, container.Name); found {
if dockerContainer, found, _ := dockerContainers.FindPodContainer(podFullName, container.Name); found {
count++
wg.Add(1)
go func() {
@ -400,7 +400,7 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers) error {
// Make sure we have a network container
var netID DockerID
if networkDockerContainer, found := dockerContainers.FindPodContainer(podFullName, networkContainerName); found {
if networkDockerContainer, found, _ := dockerContainers.FindPodContainer(podFullName, networkContainerName); found {
netID = DockerID(networkDockerContainer.ID)
} else {
glog.Infof("Network container doesn't exist, creating")
@ -442,23 +442,28 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers) error {
}
for _, container := range pod.Manifest.Containers {
if dockerContainer, found := dockerContainers.FindPodContainer(podFullName, container.Name); found {
expectedHash := hashContainer(&container)
if dockerContainer, found, hash := dockerContainers.FindPodContainer(podFullName, container.Name); found {
containerID := DockerID(dockerContainer.ID)
glog.Infof("pod %s container %s exists as %v", podFullName, container.Name, containerID)
glog.V(1).Infof("pod %s container %s exists as %v", podFullName, container.Name, containerID)
// TODO: This should probably be separated out into a separate goroutine.
healthy, err := kl.healthy(podState, container, dockerContainer)
if err != nil {
glog.V(1).Infof("health check errored: %v", err)
continue
// look for changes in the container.
if hash == 0 || hash == expectedHash {
// TODO: This should probably be separated out into a separate goroutine.
healthy, err := kl.healthy(podState, container, dockerContainer)
if err != nil {
glog.V(1).Infof("health check errored: %v", err)
continue
}
if healthy == health.Healthy {
containersToKeep[containerID] = empty{}
continue
}
glog.V(1).Infof("pod %s container %s is unhealthy.", podFullName, container.Name, healthy)
} else {
glog.V(1).Infof("container hash changed %d vs %d.", hash, expectedHash)
}
if healthy == health.Healthy {
containersToKeep[containerID] = empty{}
continue
}
glog.V(1).Infof("pod %s container %s is unhealthy.", podFullName, container.Name, healthy)
if err := kl.killContainer(dockerContainer); err != nil {
glog.V(1).Infof("Failed to kill container %s: %v", dockerContainer.ID, err)
continue
@ -482,7 +487,7 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers) error {
// Kill any containers in this pod which were not identified above (guards against duplicates).
for id, container := range dockerContainers {
curPodFullName, _ := parseDockerName(container.Names[0])
curPodFullName, _, _ := parseDockerName(container.Names[0])
if curPodFullName == podFullName {
// Don't kill containers we want to keep or those we already killed.
_, keep := containersToKeep[id]
@ -577,7 +582,7 @@ func (kl *Kubelet) SyncPods(pods []Pod) error {
}
for _, container := range existingContainers {
// Don't kill containers that are in the desired pods.
podFullName, containerName := parseDockerName(container.Names[0])
podFullName, containerName, _ := parseDockerName(container.Names[0])
if _, ok := desiredContainers[podContainer{podFullName, containerName}]; !ok {
err = kl.killContainer(container)
if err != nil {
@ -681,7 +686,7 @@ func (kl *Kubelet) GetContainerInfo(podFullName, containerName string, req *info
if err != nil {
return nil, err
}
dockerContainer, found := dockerContainers.FindPodContainer(podFullName, containerName)
dockerContainer, found, _ := dockerContainers.FindPodContainer(podFullName, containerName)
if !found {
return nil, errors.New("couldn't find container")
}
@ -727,7 +732,7 @@ func (kl *Kubelet) RunInContainer(pod *Pod, container string, cmd []string) ([]b
if err != nil {
return nil, err
}
dockerContainer, found := dockerContainers.FindPodContainer(podFullName, container)
dockerContainer, found, _ := dockerContainers.FindPodContainer(podFullName, container)
if !found {
return nil, fmt.Errorf("container not found (%s)", container)
}

View File

@ -19,8 +19,10 @@ package kubelet
import (
"encoding/json"
"fmt"
"hash/adler32"
"reflect"
"strings"
"regexp"
"strconv"
"sync"
"testing"
"time"
@ -70,14 +72,19 @@ func verifyStringArrayEquals(t *testing.T, actual, expected []string) {
}
func verifyPackUnpack(t *testing.T, podNamespace, podName, containerName string) {
container := &api.Container{Name: containerName}
hasher := adler32.New()
data := fmt.Sprintf("%#v", *container)
hasher.Write([]byte(data))
computedHash := uint64(hasher.Sum32())
name := buildDockerName(
&Pod{Name: podName, Namespace: podNamespace},
&api.Container{Name: containerName},
container,
)
podFullName := fmt.Sprintf("%s.%s", podName, podNamespace)
returnedPodFullName, returnedContainerName := parseDockerName(name)
if podFullName != returnedPodFullName || containerName != returnedContainerName {
t.Errorf("For (%s, %s), unpacked (%s, %s)", podFullName, containerName, returnedPodFullName, returnedContainerName)
returnedPodFullName, returnedContainerName, hash := parseDockerName(name)
if podFullName != returnedPodFullName || containerName != returnedContainerName || computedHash != hash {
t.Errorf("For (%s, %s, %d), unpacked (%s, %s, %d)", podFullName, containerName, computedHash, returnedPodFullName, returnedContainerName, hash)
}
}
@ -93,6 +100,17 @@ func TestContainerManifestNaming(t *testing.T) {
verifyPackUnpack(t, "file", "--manifest", "__container")
verifyPackUnpack(t, "", "m___anifest_", "container-_-")
verifyPackUnpack(t, "other", "_m___anifest", "-_-container")
container := &api.Container{Name: "container"}
pod := &Pod{Name: "foo", Namespace: "test"}
name := fmt.Sprintf("k8s--%s--%s.%s--12345", container.Name, pod.Name, pod.Namespace)
podFullName := fmt.Sprintf("%s.%s", pod.Name, pod.Namespace)
returnedPodFullName, returnedContainerName, hash := parseDockerName(name)
if returnedPodFullName != podFullName || returnedContainerName != container.Name || hash != 0 {
t.Errorf("unexpected parse: %s %s %d", returnedPodFullName, returnedContainerName, hash)
}
}
func TestGetContainerID(t *testing.T) {
@ -119,13 +137,13 @@ func TestGetContainerID(t *testing.T) {
t.Errorf("Expected %#v, Got %#v", fakeDocker.containerList, dockerContainers)
}
verifyCalls(t, fakeDocker, []string{"list"})
dockerContainer, found := dockerContainers.FindPodContainer("qux", "foo")
dockerContainer, found, _ := dockerContainers.FindPodContainer("qux", "foo")
if dockerContainer == nil || !found {
t.Errorf("Failed to find container %#v", dockerContainer)
}
fakeDocker.clearCalls()
dockerContainer, found = dockerContainers.FindPodContainer("foobar", "foo")
dockerContainer, found, _ = dockerContainers.FindPodContainer("foobar", "foo")
verifyCalls(t, fakeDocker, []string{})
if dockerContainer != nil || found {
t.Errorf("Should not have found container %#v", dockerContainer)
@ -206,10 +224,11 @@ func (cr *channelReader) GetList() [][]Pod {
func TestSyncPodsDoesNothing(t *testing.T) {
kubelet, _, fakeDocker := makeTestKubelet(t)
container := api.Container{Name: "bar"}
fakeDocker.containerList = []docker.APIContainers{
{
// format is k8s--<container-id>--<pod-fullname>
Names: []string{"/k8s--bar--foo.test"},
Names: []string{"/k8s--bar." + strconv.FormatUint(hashContainer(&container), 16) + "--foo.test"},
ID: "1234",
},
{
@ -218,9 +237,6 @@ func TestSyncPodsDoesNothing(t *testing.T) {
ID: "9876",
},
}
fakeDocker.container = &docker.Container{
ID: "1234",
}
err := kubelet.SyncPods([]Pod{
{
Name: "foo",
@ -228,7 +244,7 @@ func TestSyncPodsDoesNothing(t *testing.T) {
Manifest: api.ContainerManifest{
ID: "foo",
Containers: []api.Container{
{Name: "bar"},
container,
},
},
},
@ -253,6 +269,14 @@ func (kl *Kubelet) drainWorkers() {
}
}
func matchString(t *testing.T, pattern, str string) bool {
match, err := regexp.MatchString(pattern, str)
if err != nil {
t.Logf("unexpected error: %v", err)
}
return match
}
func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
kubelet, _, fakeDocker := makeTestKubelet(t)
fakeDocker.containerList = []docker.APIContainers{}
@ -278,8 +302,8 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
fakeDocker.lock.Lock()
if len(fakeDocker.Created) != 2 ||
!strings.HasPrefix(fakeDocker.Created[0], "k8s--net--foo.test--") ||
!strings.HasPrefix(fakeDocker.Created[1], "k8s--bar--foo.test--") {
!matchString(t, "k8s--net\\.[a-f0-9]+--foo.test--", fakeDocker.Created[0]) ||
!matchString(t, "k8s--bar\\.[a-f0-9]+--foo.test--", fakeDocker.Created[1]) {
t.Errorf("Unexpected containers created %v", fakeDocker.Created)
}
fakeDocker.lock.Unlock()
@ -316,7 +340,7 @@ func TestSyncPodsWithNetCreatesContainer(t *testing.T) {
fakeDocker.lock.Lock()
if len(fakeDocker.Created) != 1 ||
!strings.HasPrefix(fakeDocker.Created[0], "k8s--bar--foo.test--") {
!matchString(t, "k8s--bar\\.[a-f0-9]+--foo.test--", fakeDocker.Created[0]) {
t.Errorf("Unexpected containers created %v", fakeDocker.Created)
}
fakeDocker.lock.Unlock()
@ -453,6 +477,48 @@ func (f *FalseHealthChecker) HealthCheck(state api.PodState, container api.Conta
return health.Unhealthy, nil
}
func TestSyncPodBadHash(t *testing.T) {
kubelet, _, fakeDocker := makeTestKubelet(t)
kubelet.healthChecker = &FalseHealthChecker{}
dockerContainers := DockerContainers{
"1234": &docker.APIContainers{
// the k8s prefix is required for the kubelet to manage the container
Names: []string{"/k8s--bar.1234--foo.test"},
ID: "1234",
},
"9876": &docker.APIContainers{
// network container
Names: []string{"/k8s--net--foo.test--"},
ID: "9876",
},
}
err := kubelet.syncPod(&Pod{
Name: "foo",
Namespace: "test",
Manifest: api.ContainerManifest{
ID: "foo",
Containers: []api.Container{
{Name: "bar"},
},
},
}, dockerContainers)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
verifyCalls(t, fakeDocker, []string{"list", "stop", "create", "start"})
// A map interation is used to delete containers, so must not depend on
// order here.
expectedToStop := map[string]bool{
"1234": true,
}
if len(fakeDocker.stopped) != 1 ||
!expectedToStop[fakeDocker.stopped[0]] {
t.Errorf("Wrong containers were stopped: %v", fakeDocker.stopped)
}
}
func TestSyncPodUnhealthy(t *testing.T) {
kubelet, _, fakeDocker := makeTestKubelet(t)
kubelet.healthChecker = &FalseHealthChecker{}