diff --git a/pkg/kubelet/container/runtime.go b/pkg/kubelet/container/runtime.go index a8a49c36756..c0eeaea422c 100644 --- a/pkg/kubelet/container/runtime.go +++ b/pkg/kubelet/container/runtime.go @@ -57,7 +57,8 @@ type Runtime interface { // KillPod kills all the containers of a pod. KillPod(pod Pod) error // GetPodStatus retrieves the status of the pod, including the information of - // all containers in the pod. + // all containers in the pod. Clients of this interface assume the containers + // statuses in a pod always have a deterministic ordering (eg: sorted by name). GetPodStatus(*api.Pod) (*api.PodStatus, error) // PullImage pulls an image from the network to local storage using the supplied // secrets if necessary. diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go index e27222f3c4e..02d56d9446f 100644 --- a/pkg/kubelet/dockertools/manager.go +++ b/pkg/kubelet/dockertools/manager.go @@ -25,6 +25,7 @@ import ( "os" "os/exec" "path" + "sort" "strconv" "strings" "sync" @@ -470,7 +471,10 @@ func (dm *DockerManager) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) { } podStatus.ContainerStatuses = append(podStatus.ContainerStatuses, *status) } - + // Sort the container statuses since clients of this interface expect the list + // of containers in a pod to behave like the output of `docker list`, which has a + // deterministic order. + sort.Sort(kubeletTypes.SortedContainerStatuses(podStatus.ContainerStatuses)) return &podStatus, nil } diff --git a/pkg/kubelet/dockertools/manager_test.go b/pkg/kubelet/dockertools/manager_test.go index fd5ba06d0e2..061a8daa384 100644 --- a/pkg/kubelet/dockertools/manager_test.go +++ b/pkg/kubelet/dockertools/manager_test.go @@ -1992,3 +1992,61 @@ func TestSyncPodWithTerminationLog(t *testing.T) { t.Errorf("Unexpected container path: %s", parts[1]) } } + +func TestGetPodStatusSortedContainers(t *testing.T) { + dm, fakeDocker := newTestDockerManager() + dockerInspect := map[string]*docker.Container{} + dockerList := []docker.APIContainers{} + specContainerList := []api.Container{} + expectedOrder := []string{} + + numContainers := 10 + podName := "foo" + podNs := "test" + podUID := "uid1" + fakeConfig := &docker.Config{ + Image: "some:latest", + } + + for i := 0; i < numContainers; i++ { + id := fmt.Sprintf("%v", i) + containerName := fmt.Sprintf("%vcontainer", id) + expectedOrder = append(expectedOrder, containerName) + dockerInspect[id] = &docker.Container{ + ID: id, + Name: containerName, + Config: fakeConfig, + Image: fmt.Sprintf("%vimageid", id), + } + dockerList = append(dockerList, docker.APIContainers{ + ID: id, + Names: []string{fmt.Sprintf("/k8s_%v_%v_%v_%v_42", containerName, podName, podNs, podUID)}, + }) + specContainerList = append(specContainerList, api.Container{Name: containerName}) + } + + fakeDocker.ContainerMap = dockerInspect + fakeDocker.ContainerList = dockerList + fakeDocker.ClearCalls() + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + UID: types.UID(podUID), + Name: podName, + Namespace: podNs, + }, + Spec: api.PodSpec{ + Containers: specContainerList, + }, + } + for i := 0; i < 5; i++ { + status, err := dm.GetPodStatus(pod) + if err != nil { + t.Fatalf("unexpected error %v", err) + } + for i, c := range status.ContainerStatuses { + if expectedOrder[i] != c.Name { + t.Fatalf("Container status not sorted, expected %v at index %d, but found %v", expectedOrder[i], i, c.Name) + } + } + } +} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 2553c6bbaeb..47e86f6c9c2 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1845,11 +1845,14 @@ func (kl *Kubelet) GetPodByName(namespace, name string) (*api.Pod, bool) { } func (kl *Kubelet) updateRuntimeUp() { + start := time.Now() err := waitUntilRuntimeIsUp(kl.containerRuntime, 100*time.Millisecond) kl.runtimeMutex.Lock() defer kl.runtimeMutex.Unlock() if err == nil { kl.lastTimestampRuntimeUp = time.Now() + } else { + glog.Errorf("Container runtime sanity check failed after %v, err: %v", time.Since(start), err) } } diff --git a/pkg/kubelet/status_manager.go b/pkg/kubelet/status_manager.go index 4df39d7c086..89e9081a10c 100644 --- a/pkg/kubelet/status_manager.go +++ b/pkg/kubelet/status_manager.go @@ -19,11 +19,13 @@ package kubelet import ( "fmt" "reflect" + "sort" "sync" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" + kubeletTypes "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/golang/glog" ) @@ -51,6 +53,16 @@ func newStatusManager(kubeClient client.Interface) *statusManager { } } +// isStatusEqual returns true if the given pod statuses are equal, false otherwise. +// This method sorts container statuses so order does not affect equality. +func isStatusEqual(oldStatus, status *api.PodStatus) bool { + sort.Sort(kubeletTypes.SortedContainerStatuses(status.ContainerStatuses)) + sort.Sort(kubeletTypes.SortedContainerStatuses(oldStatus.ContainerStatuses)) + + // TODO: More sophisticated equality checking. + return reflect.DeepEqual(status, oldStatus) +} + func (s *statusManager) Start() { // syncBatch blocks when no updates are available, we can run it in a tight loop. glog.Info("Starting to sync pod status with apiserver") @@ -96,7 +108,13 @@ func (s *statusManager) SetPodStatus(pod *api.Pod, status api.PodStatus) { } } - if !found || !reflect.DeepEqual(oldStatus, status) { + // TODO: Holding a lock during blocking operations is dangerous. Refactor so this isn't necessary. + // The intent here is to prevent concurrent updates to a pod's status from + // clobbering each other so the phase of a pod progresses monotonically. + // Currently this routine is not called for the same pod from multiple + // workers and/or the kubelet but dropping the lock before sending the + // status down the channel feels like an easy way to get a bullet in foot. + if !found || !isStatusEqual(&oldStatus, &status) { s.podStatuses[podFullName] = status s.podStatusChannel <- podStatusSyncRequest{pod, status} } else { @@ -148,6 +166,10 @@ func (s *statusManager) syncBatch() error { // We failed to update status. In order to make sure we retry next time // we delete cached value. This may result in an additional update, but // this is ok. - s.DeletePodStatus(podFullName) + // Doing this synchronously will lead to a deadlock if the podStatusChannel + // is full, and the pod worker holding the lock is waiting on this method + // to clear the channel. Even if this delete never runs subsequent container + // changes on the node should trigger updates. + go s.DeletePodStatus(podFullName) return fmt.Errorf("error updating status for pod %q: %v", pod.Name, err) } diff --git a/pkg/kubelet/status_manager_test.go b/pkg/kubelet/status_manager_test.go index f04a87781b2..fc473aa8032 100644 --- a/pkg/kubelet/status_manager_test.go +++ b/pkg/kubelet/status_manager_test.go @@ -17,6 +17,7 @@ limitations under the License. package kubelet import ( + "fmt" "math/rand" "strconv" "testing" @@ -159,3 +160,35 @@ func TestSyncBatch(t *testing.T) { } verifyActions(t, syncer.kubeClient, []string{"get-pod", "update-status-pod"}) } + +// shuffle returns a new shuffled list of container statuses. +func shuffle(statuses []api.ContainerStatus) []api.ContainerStatus { + numStatuses := len(statuses) + randIndexes := rand.Perm(numStatuses) + shuffled := make([]api.ContainerStatus, numStatuses) + for i := 0; i < numStatuses; i++ { + shuffled[i] = statuses[randIndexes[i]] + } + return shuffled +} + +func TestStatusEquality(t *testing.T) { + containerStatus := []api.ContainerStatus{} + for i := 0; i < 10; i++ { + s := api.ContainerStatus{ + Name: fmt.Sprintf("container%d", i), + } + containerStatus = append(containerStatus, s) + } + podStatus := api.PodStatus{ + ContainerStatuses: containerStatus, + } + for i := 0; i < 10; i++ { + oldPodStatus := api.PodStatus{ + ContainerStatuses: shuffle(podStatus.ContainerStatuses), + } + if !isStatusEqual(&oldPodStatus, &podStatus) { + t.Fatalf("Order of container statuses should not affect equality.") + } + } +} diff --git a/pkg/kubelet/types/types.go b/pkg/kubelet/types/types.go index 172c84ae1ab..55ba55c8397 100644 --- a/pkg/kubelet/types/types.go +++ b/pkg/kubelet/types/types.go @@ -19,8 +19,12 @@ package types import ( "net/http" "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" ) +// TODO: Reconcile custom types in kubelet/types and this subpackage + // DockerID is an ID of docker container. It is a type to make it clear when we're working with docker container Ids type DockerID string @@ -56,3 +60,13 @@ func (t *Timestamp) Get() time.Time { func (t *Timestamp) GetString() string { return t.time.Format(time.RFC3339Nano) } + +// A type to help sort container statuses based on container names. +type SortedContainerStatuses []api.ContainerStatus + +func (s SortedContainerStatuses) Len() int { return len(s) } +func (s SortedContainerStatuses) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + +func (s SortedContainerStatuses) Less(i, j int) bool { + return s[i].Name < s[j].Name +}