Merge pull request #10182 from bprashanth/kubelet_status

Fix kubelet deadlock
This commit is contained in:
Jeff Lowdermilk 2015-06-23 15:40:49 -07:00
commit d212ea17cd
7 changed files with 139 additions and 4 deletions

View File

@ -57,7 +57,8 @@ type Runtime interface {
// KillPod kills all the containers of a pod. // KillPod kills all the containers of a pod.
KillPod(pod Pod) error KillPod(pod Pod) error
// GetPodStatus retrieves the status of the pod, including the information of // GetPodStatus retrieves the status of the pod, including the information of
// all containers in the pod. // all containers in the pod. Clients of this interface assume the containers
// statuses in a pod always have a deterministic ordering (eg: sorted by name).
GetPodStatus(*api.Pod) (*api.PodStatus, error) GetPodStatus(*api.Pod) (*api.PodStatus, error)
// PullImage pulls an image from the network to local storage using the supplied // PullImage pulls an image from the network to local storage using the supplied
// secrets if necessary. // secrets if necessary.

View File

@ -25,6 +25,7 @@ import (
"os" "os"
"os/exec" "os/exec"
"path" "path"
"sort"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -470,7 +471,10 @@ func (dm *DockerManager) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) {
} }
podStatus.ContainerStatuses = append(podStatus.ContainerStatuses, *status) podStatus.ContainerStatuses = append(podStatus.ContainerStatuses, *status)
} }
// Sort the container statuses since clients of this interface expect the list
// of containers in a pod to behave like the output of `docker list`, which has a
// deterministic order.
sort.Sort(kubeletTypes.SortedContainerStatuses(podStatus.ContainerStatuses))
return &podStatus, nil return &podStatus, nil
} }

View File

@ -1992,3 +1992,61 @@ func TestSyncPodWithTerminationLog(t *testing.T) {
t.Errorf("Unexpected container path: %s", parts[1]) t.Errorf("Unexpected container path: %s", parts[1])
} }
} }
func TestGetPodStatusSortedContainers(t *testing.T) {
dm, fakeDocker := newTestDockerManager()
dockerInspect := map[string]*docker.Container{}
dockerList := []docker.APIContainers{}
specContainerList := []api.Container{}
expectedOrder := []string{}
numContainers := 10
podName := "foo"
podNs := "test"
podUID := "uid1"
fakeConfig := &docker.Config{
Image: "some:latest",
}
for i := 0; i < numContainers; i++ {
id := fmt.Sprintf("%v", i)
containerName := fmt.Sprintf("%vcontainer", id)
expectedOrder = append(expectedOrder, containerName)
dockerInspect[id] = &docker.Container{
ID: id,
Name: containerName,
Config: fakeConfig,
Image: fmt.Sprintf("%vimageid", id),
}
dockerList = append(dockerList, docker.APIContainers{
ID: id,
Names: []string{fmt.Sprintf("/k8s_%v_%v_%v_%v_42", containerName, podName, podNs, podUID)},
})
specContainerList = append(specContainerList, api.Container{Name: containerName})
}
fakeDocker.ContainerMap = dockerInspect
fakeDocker.ContainerList = dockerList
fakeDocker.ClearCalls()
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
UID: types.UID(podUID),
Name: podName,
Namespace: podNs,
},
Spec: api.PodSpec{
Containers: specContainerList,
},
}
for i := 0; i < 5; i++ {
status, err := dm.GetPodStatus(pod)
if err != nil {
t.Fatalf("unexpected error %v", err)
}
for i, c := range status.ContainerStatuses {
if expectedOrder[i] != c.Name {
t.Fatalf("Container status not sorted, expected %v at index %d, but found %v", expectedOrder[i], i, c.Name)
}
}
}
}

View File

@ -1845,11 +1845,14 @@ func (kl *Kubelet) GetPodByName(namespace, name string) (*api.Pod, bool) {
} }
func (kl *Kubelet) updateRuntimeUp() { func (kl *Kubelet) updateRuntimeUp() {
start := time.Now()
err := waitUntilRuntimeIsUp(kl.containerRuntime, 100*time.Millisecond) err := waitUntilRuntimeIsUp(kl.containerRuntime, 100*time.Millisecond)
kl.runtimeMutex.Lock() kl.runtimeMutex.Lock()
defer kl.runtimeMutex.Unlock() defer kl.runtimeMutex.Unlock()
if err == nil { if err == nil {
kl.lastTimestampRuntimeUp = time.Now() kl.lastTimestampRuntimeUp = time.Now()
} else {
glog.Errorf("Container runtime sanity check failed after %v, err: %v", time.Since(start), err)
} }
} }

View File

@ -19,11 +19,13 @@ package kubelet
import ( import (
"fmt" "fmt"
"reflect" "reflect"
"sort"
"sync" "sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
kubeletTypes "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog" "github.com/golang/glog"
) )
@ -51,6 +53,16 @@ func newStatusManager(kubeClient client.Interface) *statusManager {
} }
} }
// isStatusEqual returns true if the given pod statuses are equal, false otherwise.
// This method sorts container statuses so order does not affect equality.
func isStatusEqual(oldStatus, status *api.PodStatus) bool {
sort.Sort(kubeletTypes.SortedContainerStatuses(status.ContainerStatuses))
sort.Sort(kubeletTypes.SortedContainerStatuses(oldStatus.ContainerStatuses))
// TODO: More sophisticated equality checking.
return reflect.DeepEqual(status, oldStatus)
}
func (s *statusManager) Start() { func (s *statusManager) Start() {
// syncBatch blocks when no updates are available, we can run it in a tight loop. // syncBatch blocks when no updates are available, we can run it in a tight loop.
glog.Info("Starting to sync pod status with apiserver") glog.Info("Starting to sync pod status with apiserver")
@ -96,7 +108,13 @@ func (s *statusManager) SetPodStatus(pod *api.Pod, status api.PodStatus) {
} }
} }
if !found || !reflect.DeepEqual(oldStatus, status) { // TODO: Holding a lock during blocking operations is dangerous. Refactor so this isn't necessary.
// The intent here is to prevent concurrent updates to a pod's status from
// clobbering each other so the phase of a pod progresses monotonically.
// Currently this routine is not called for the same pod from multiple
// workers and/or the kubelet but dropping the lock before sending the
// status down the channel feels like an easy way to get a bullet in foot.
if !found || !isStatusEqual(&oldStatus, &status) {
s.podStatuses[podFullName] = status s.podStatuses[podFullName] = status
s.podStatusChannel <- podStatusSyncRequest{pod, status} s.podStatusChannel <- podStatusSyncRequest{pod, status}
} else { } else {
@ -148,6 +166,10 @@ func (s *statusManager) syncBatch() error {
// We failed to update status. In order to make sure we retry next time // We failed to update status. In order to make sure we retry next time
// we delete cached value. This may result in an additional update, but // we delete cached value. This may result in an additional update, but
// this is ok. // this is ok.
s.DeletePodStatus(podFullName) // Doing this synchronously will lead to a deadlock if the podStatusChannel
// is full, and the pod worker holding the lock is waiting on this method
// to clear the channel. Even if this delete never runs subsequent container
// changes on the node should trigger updates.
go s.DeletePodStatus(podFullName)
return fmt.Errorf("error updating status for pod %q: %v", pod.Name, err) return fmt.Errorf("error updating status for pod %q: %v", pod.Name, err)
} }

View File

@ -17,6 +17,7 @@ limitations under the License.
package kubelet package kubelet
import ( import (
"fmt"
"math/rand" "math/rand"
"strconv" "strconv"
"testing" "testing"
@ -159,3 +160,35 @@ func TestSyncBatch(t *testing.T) {
} }
verifyActions(t, syncer.kubeClient, []string{"get-pod", "update-status-pod"}) verifyActions(t, syncer.kubeClient, []string{"get-pod", "update-status-pod"})
} }
// shuffle returns a new shuffled list of container statuses.
func shuffle(statuses []api.ContainerStatus) []api.ContainerStatus {
numStatuses := len(statuses)
randIndexes := rand.Perm(numStatuses)
shuffled := make([]api.ContainerStatus, numStatuses)
for i := 0; i < numStatuses; i++ {
shuffled[i] = statuses[randIndexes[i]]
}
return shuffled
}
func TestStatusEquality(t *testing.T) {
containerStatus := []api.ContainerStatus{}
for i := 0; i < 10; i++ {
s := api.ContainerStatus{
Name: fmt.Sprintf("container%d", i),
}
containerStatus = append(containerStatus, s)
}
podStatus := api.PodStatus{
ContainerStatuses: containerStatus,
}
for i := 0; i < 10; i++ {
oldPodStatus := api.PodStatus{
ContainerStatuses: shuffle(podStatus.ContainerStatuses),
}
if !isStatusEqual(&oldPodStatus, &podStatus) {
t.Fatalf("Order of container statuses should not affect equality.")
}
}
}

View File

@ -19,8 +19,12 @@ package types
import ( import (
"net/http" "net/http"
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
) )
// TODO: Reconcile custom types in kubelet/types and this subpackage
// DockerID is an ID of docker container. It is a type to make it clear when we're working with docker container Ids // DockerID is an ID of docker container. It is a type to make it clear when we're working with docker container Ids
type DockerID string type DockerID string
@ -56,3 +60,13 @@ func (t *Timestamp) Get() time.Time {
func (t *Timestamp) GetString() string { func (t *Timestamp) GetString() string {
return t.time.Format(time.RFC3339Nano) return t.time.Format(time.RFC3339Nano)
} }
// A type to help sort container statuses based on container names.
type SortedContainerStatuses []api.ContainerStatus
func (s SortedContainerStatuses) Len() int { return len(s) }
func (s SortedContainerStatuses) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s SortedContainerStatuses) Less(i, j int) bool {
return s[i].Name < s[j].Name
}