diff --git a/pkg/kubelet/docker.go b/pkg/kubelet/docker.go index b7b9217e449..eef646c6f66 100644 --- a/pkg/kubelet/docker.go +++ b/pkg/kubelet/docker.go @@ -19,12 +19,15 @@ package kubelet import ( "errors" "fmt" + "hash/adler32" "math/rand" "os/exec" + "strconv" "strings" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/fsouza/go-dockerclient" + "github.com/golang/glog" ) // DockerContainerData is the structured representation of the JSON object returned by Docker inspect @@ -106,21 +109,21 @@ func (p dockerPuller) Pull(image string) error { // DockerContainers is a map of containers type DockerContainers map[DockerID]*docker.APIContainers -func (c DockerContainers) FindPodContainer(podFullName, containerName string) (*docker.APIContainers, bool) { +func (c DockerContainers) FindPodContainer(podFullName, containerName string) (*docker.APIContainers, bool, uint64) { for _, dockerContainer := range c { - dockerManifestID, dockerContainerName := parseDockerName(dockerContainer.Names[0]) + dockerManifestID, dockerContainerName, hash := parseDockerName(dockerContainer.Names[0]) if dockerManifestID == podFullName && dockerContainerName == containerName { - return dockerContainer, true + return dockerContainer, true, hash } } - return nil, false + return nil, false, 0 } func (c DockerContainers) FindContainersByPodFullName(podFullName string) map[string]*docker.APIContainers { containers := make(map[string]*docker.APIContainers) for _, dockerContainer := range c { - dockerManifestID, dockerContainerName := parseDockerName(dockerContainer.Names[0]) + dockerManifestID, dockerContainerName, _ := parseDockerName(dockerContainer.Names[0]) if dockerManifestID == podFullName { containers[dockerContainerName] = dockerContainer } @@ -160,7 +163,7 @@ func getDockerPodInfo(client DockerInterface, podFullName string) (api.PodInfo, } for _, value := range containers { - dockerManifestID, dockerContainerName := parseDockerName(value.Names[0]) + dockerManifestID, dockerContainerName, _ := parseDockerName(value.Names[0]) if dockerManifestID != podFullName { continue } @@ -198,15 +201,22 @@ func unescapeDash(in string) (out string) { const containerNamePrefix = "k8s" +func hashContainer(container *api.Container) uint64 { + hash := adler32.New() + fmt.Fprintf(hash, "%#v", *container) + return uint64(hash.Sum32()) +} + // Creates a name which can be reversed to identify both full pod name and container name. func buildDockerName(pod *Pod, container *api.Container) string { + containerName := escapeDash(container.Name) + "." + strconv.FormatUint(hashContainer(container), 16) // Note, manifest.ID could be blank. - return fmt.Sprintf("%s--%s--%s--%08x", containerNamePrefix, escapeDash(container.Name), escapeDash(GetPodFullName(pod)), rand.Uint32()) + return fmt.Sprintf("%s--%s--%s--%08x", containerNamePrefix, containerName, escapeDash(GetPodFullName(pod)), rand.Uint32()) } // Upacks a container name, returning the pod full name and container name we would have used to // construct the docker name. If the docker name isn't one we created, we may return empty strings. -func parseDockerName(name string) (podFullName, containerName string) { +func parseDockerName(name string) (podFullName, containerName string, hash uint64) { // For some reason docker appears to be appending '/' to names. // If it's there, strip it. if name[0] == '/' { @@ -217,7 +227,15 @@ func parseDockerName(name string) (podFullName, containerName string) { return } if len(parts) > 1 { - containerName = unescapeDash(parts[1]) + pieces := strings.Split(parts[1], ".") + containerName = unescapeDash(pieces[0]) + if len(pieces) > 1 { + var err error + hash, err = strconv.ParseUint(pieces[1], 16, 32) + if err != nil { + glog.Infof("invalid container hash: %s", pieces[1]) + } + } } if len(parts) > 2 { podFullName = unescapeDash(parts[2]) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index c8aeeaa55fa..6c4d8c5d8e6 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -322,7 +322,7 @@ func (kl *Kubelet) runContainer(pod *Pod, container *api.Container, podVolumes v func (kl *Kubelet) killContainer(dockerContainer *docker.APIContainers) error { glog.Infof("Killing: %s", dockerContainer.ID) err := kl.dockerClient.StopContainer(dockerContainer.ID, 10) - podFullName, containerName := parseDockerName(dockerContainer.Names[0]) + podFullName, containerName, _ := parseDockerName(dockerContainer.Names[0]) kl.LogEvent(&api.Event{ Event: "STOP", Manifest: &api.ContainerManifest{ @@ -366,7 +366,7 @@ func (kl *Kubelet) deleteAllContainers(pod *Pod, podFullName string, dockerConta errs := make(chan error, len(pod.Manifest.Containers)) wg := sync.WaitGroup{} for _, container := range pod.Manifest.Containers { - if dockerContainer, found := dockerContainers.FindPodContainer(podFullName, container.Name); found { + if dockerContainer, found, _ := dockerContainers.FindPodContainer(podFullName, container.Name); found { count++ wg.Add(1) go func() { @@ -400,7 +400,7 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers) error { // Make sure we have a network container var netID DockerID - if networkDockerContainer, found := dockerContainers.FindPodContainer(podFullName, networkContainerName); found { + if networkDockerContainer, found, _ := dockerContainers.FindPodContainer(podFullName, networkContainerName); found { netID = DockerID(networkDockerContainer.ID) } else { glog.Infof("Network container doesn't exist, creating") @@ -442,23 +442,28 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers) error { } for _, container := range pod.Manifest.Containers { - if dockerContainer, found := dockerContainers.FindPodContainer(podFullName, container.Name); found { + expectedHash := hashContainer(&container) + if dockerContainer, found, hash := dockerContainers.FindPodContainer(podFullName, container.Name); found { containerID := DockerID(dockerContainer.ID) glog.Infof("pod %s container %s exists as %v", podFullName, container.Name, containerID) glog.V(1).Infof("pod %s container %s exists as %v", podFullName, container.Name, containerID) - // TODO: This should probably be separated out into a separate goroutine. - healthy, err := kl.healthy(podState, container, dockerContainer) - if err != nil { - glog.V(1).Infof("health check errored: %v", err) - continue + // look for changes in the container. + if hash == 0 || hash == expectedHash { + // TODO: This should probably be separated out into a separate goroutine. + healthy, err := kl.healthy(podState, container, dockerContainer) + if err != nil { + glog.V(1).Infof("health check errored: %v", err) + continue + } + if healthy == health.Healthy { + containersToKeep[containerID] = empty{} + continue + } + glog.V(1).Infof("pod %s container %s is unhealthy.", podFullName, container.Name, healthy) + } else { + glog.V(1).Infof("container hash changed %d vs %d.", hash, expectedHash) } - if healthy == health.Healthy { - containersToKeep[containerID] = empty{} - continue - } - - glog.V(1).Infof("pod %s container %s is unhealthy.", podFullName, container.Name, healthy) if err := kl.killContainer(dockerContainer); err != nil { glog.V(1).Infof("Failed to kill container %s: %v", dockerContainer.ID, err) continue @@ -482,7 +487,7 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers) error { // Kill any containers in this pod which were not identified above (guards against duplicates). for id, container := range dockerContainers { - curPodFullName, _ := parseDockerName(container.Names[0]) + curPodFullName, _, _ := parseDockerName(container.Names[0]) if curPodFullName == podFullName { // Don't kill containers we want to keep or those we already killed. _, keep := containersToKeep[id] @@ -577,7 +582,7 @@ func (kl *Kubelet) SyncPods(pods []Pod) error { } for _, container := range existingContainers { // Don't kill containers that are in the desired pods. - podFullName, containerName := parseDockerName(container.Names[0]) + podFullName, containerName, _ := parseDockerName(container.Names[0]) if _, ok := desiredContainers[podContainer{podFullName, containerName}]; !ok { err = kl.killContainer(container) if err != nil { @@ -681,7 +686,7 @@ func (kl *Kubelet) GetContainerInfo(podFullName, containerName string, req *info if err != nil { return nil, err } - dockerContainer, found := dockerContainers.FindPodContainer(podFullName, containerName) + dockerContainer, found, _ := dockerContainers.FindPodContainer(podFullName, containerName) if !found { return nil, errors.New("couldn't find container") } @@ -727,7 +732,7 @@ func (kl *Kubelet) RunInContainer(pod *Pod, container string, cmd []string) ([]b if err != nil { return nil, err } - dockerContainer, found := dockerContainers.FindPodContainer(podFullName, container) + dockerContainer, found, _ := dockerContainers.FindPodContainer(podFullName, container) if !found { return nil, fmt.Errorf("container not found (%s)", container) } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 3ffcef77857..f71449a7852 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -19,8 +19,10 @@ package kubelet import ( "encoding/json" "fmt" + "hash/adler32" "reflect" - "strings" + "regexp" + "strconv" "sync" "testing" "time" @@ -70,14 +72,19 @@ func verifyStringArrayEquals(t *testing.T, actual, expected []string) { } func verifyPackUnpack(t *testing.T, podNamespace, podName, containerName string) { + container := &api.Container{Name: containerName} + hasher := adler32.New() + data := fmt.Sprintf("%#v", *container) + hasher.Write([]byte(data)) + computedHash := uint64(hasher.Sum32()) name := buildDockerName( &Pod{Name: podName, Namespace: podNamespace}, - &api.Container{Name: containerName}, + container, ) podFullName := fmt.Sprintf("%s.%s", podName, podNamespace) - returnedPodFullName, returnedContainerName := parseDockerName(name) - if podFullName != returnedPodFullName || containerName != returnedContainerName { - t.Errorf("For (%s, %s), unpacked (%s, %s)", podFullName, containerName, returnedPodFullName, returnedContainerName) + returnedPodFullName, returnedContainerName, hash := parseDockerName(name) + if podFullName != returnedPodFullName || containerName != returnedContainerName || computedHash != hash { + t.Errorf("For (%s, %s, %d), unpacked (%s, %s, %d)", podFullName, containerName, computedHash, returnedPodFullName, returnedContainerName, hash) } } @@ -93,6 +100,17 @@ func TestContainerManifestNaming(t *testing.T) { verifyPackUnpack(t, "file", "--manifest", "__container") verifyPackUnpack(t, "", "m___anifest_", "container-_-") verifyPackUnpack(t, "other", "_m___anifest", "-_-container") + + container := &api.Container{Name: "container"} + pod := &Pod{Name: "foo", Namespace: "test"} + name := fmt.Sprintf("k8s--%s--%s.%s--12345", container.Name, pod.Name, pod.Namespace) + + podFullName := fmt.Sprintf("%s.%s", pod.Name, pod.Namespace) + returnedPodFullName, returnedContainerName, hash := parseDockerName(name) + if returnedPodFullName != podFullName || returnedContainerName != container.Name || hash != 0 { + t.Errorf("unexpected parse: %s %s %d", returnedPodFullName, returnedContainerName, hash) + } + } func TestGetContainerID(t *testing.T) { @@ -119,13 +137,13 @@ func TestGetContainerID(t *testing.T) { t.Errorf("Expected %#v, Got %#v", fakeDocker.containerList, dockerContainers) } verifyCalls(t, fakeDocker, []string{"list"}) - dockerContainer, found := dockerContainers.FindPodContainer("qux", "foo") + dockerContainer, found, _ := dockerContainers.FindPodContainer("qux", "foo") if dockerContainer == nil || !found { t.Errorf("Failed to find container %#v", dockerContainer) } fakeDocker.clearCalls() - dockerContainer, found = dockerContainers.FindPodContainer("foobar", "foo") + dockerContainer, found, _ = dockerContainers.FindPodContainer("foobar", "foo") verifyCalls(t, fakeDocker, []string{}) if dockerContainer != nil || found { t.Errorf("Should not have found container %#v", dockerContainer) @@ -206,10 +224,11 @@ func (cr *channelReader) GetList() [][]Pod { func TestSyncPodsDoesNothing(t *testing.T) { kubelet, _, fakeDocker := makeTestKubelet(t) + container := api.Container{Name: "bar"} fakeDocker.containerList = []docker.APIContainers{ { // format is k8s---- - Names: []string{"/k8s--bar--foo.test"}, + Names: []string{"/k8s--bar." + strconv.FormatUint(hashContainer(&container), 16) + "--foo.test"}, ID: "1234", }, { @@ -218,9 +237,6 @@ func TestSyncPodsDoesNothing(t *testing.T) { ID: "9876", }, } - fakeDocker.container = &docker.Container{ - ID: "1234", - } err := kubelet.SyncPods([]Pod{ { Name: "foo", @@ -228,7 +244,7 @@ func TestSyncPodsDoesNothing(t *testing.T) { Manifest: api.ContainerManifest{ ID: "foo", Containers: []api.Container{ - {Name: "bar"}, + container, }, }, }, @@ -253,6 +269,14 @@ func (kl *Kubelet) drainWorkers() { } } +func matchString(t *testing.T, pattern, str string) bool { + match, err := regexp.MatchString(pattern, str) + if err != nil { + t.Logf("unexpected error: %v", err) + } + return match +} + func TestSyncPodsCreatesNetAndContainer(t *testing.T) { kubelet, _, fakeDocker := makeTestKubelet(t) fakeDocker.containerList = []docker.APIContainers{} @@ -278,8 +302,8 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) { fakeDocker.lock.Lock() if len(fakeDocker.Created) != 2 || - !strings.HasPrefix(fakeDocker.Created[0], "k8s--net--foo.test--") || - !strings.HasPrefix(fakeDocker.Created[1], "k8s--bar--foo.test--") { + !matchString(t, "k8s--net\\.[a-f0-9]+--foo.test--", fakeDocker.Created[0]) || + !matchString(t, "k8s--bar\\.[a-f0-9]+--foo.test--", fakeDocker.Created[1]) { t.Errorf("Unexpected containers created %v", fakeDocker.Created) } fakeDocker.lock.Unlock() @@ -316,7 +340,7 @@ func TestSyncPodsWithNetCreatesContainer(t *testing.T) { fakeDocker.lock.Lock() if len(fakeDocker.Created) != 1 || - !strings.HasPrefix(fakeDocker.Created[0], "k8s--bar--foo.test--") { + !matchString(t, "k8s--bar\\.[a-f0-9]+--foo.test--", fakeDocker.Created[0]) { t.Errorf("Unexpected containers created %v", fakeDocker.Created) } fakeDocker.lock.Unlock() @@ -453,6 +477,48 @@ func (f *FalseHealthChecker) HealthCheck(state api.PodState, container api.Conta return health.Unhealthy, nil } +func TestSyncPodBadHash(t *testing.T) { + kubelet, _, fakeDocker := makeTestKubelet(t) + kubelet.healthChecker = &FalseHealthChecker{} + dockerContainers := DockerContainers{ + "1234": &docker.APIContainers{ + // the k8s prefix is required for the kubelet to manage the container + Names: []string{"/k8s--bar.1234--foo.test"}, + ID: "1234", + }, + "9876": &docker.APIContainers{ + // network container + Names: []string{"/k8s--net--foo.test--"}, + ID: "9876", + }, + } + err := kubelet.syncPod(&Pod{ + Name: "foo", + Namespace: "test", + Manifest: api.ContainerManifest{ + ID: "foo", + Containers: []api.Container{ + {Name: "bar"}, + }, + }, + }, dockerContainers) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + verifyCalls(t, fakeDocker, []string{"list", "stop", "create", "start"}) + + // A map interation is used to delete containers, so must not depend on + // order here. + expectedToStop := map[string]bool{ + "1234": true, + } + if len(fakeDocker.stopped) != 1 || + !expectedToStop[fakeDocker.stopped[0]] { + t.Errorf("Wrong containers were stopped: %v", fakeDocker.stopped) + } +} + func TestSyncPodUnhealthy(t *testing.T) { kubelet, _, fakeDocker := makeTestKubelet(t) kubelet.healthChecker = &FalseHealthChecker{}