Merge pull request #5019 from yujuhong/reject_pods

kubelet: reject pods on host port conflict
This commit is contained in:
Dawn Chen 2015-03-06 11:42:04 -08:00
commit 2d0743b143
4 changed files with 148 additions and 65 deletions

View File

@ -155,6 +155,8 @@ func NewMainKubelet(
return nil, err
}
klet.podStatuses = make(map[string]api.PodStatus)
return klet, nil
}
@ -235,6 +237,10 @@ type Kubelet struct {
// the EventRecorder to use
recorder record.EventRecorder
// A pod status cache currently used to store rejected pods and their statuses.
podStatusesLock sync.RWMutex
podStatuses map[string]api.PodStatus
}
// getRootDir returns the full path to the directory under which kubelet can
@ -456,6 +462,30 @@ func (kl *Kubelet) GetCadvisorClient() cadvisorInterface {
return kl.cadvisorClient
}
func (kl *Kubelet) getPodStatusFromCache(podFullName string) (api.PodStatus, bool) {
kl.podStatusesLock.RLock()
defer kl.podStatusesLock.RUnlock()
status, ok := kl.podStatuses[podFullName]
return status, ok
}
func (kl *Kubelet) setPodStatusInCache(podFullName string, status api.PodStatus) {
kl.podStatusesLock.Lock()
defer kl.podStatusesLock.Unlock()
kl.podStatuses[podFullName] = status
}
func (kl *Kubelet) removeOrphanedStatuses(podFullNames map[string]bool) {
kl.podStatusesLock.Lock()
defer kl.podStatusesLock.Unlock()
for key := range kl.podStatuses {
if _, ok := podFullNames[key]; !ok {
glog.V(5).Infof("Removing %q from status map.", key)
delete(kl.podStatuses, key)
}
}
}
// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan PodUpdate) {
if kl.logServer == nil {
@ -1309,10 +1339,28 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []api.BoundPod, running []*docker
}
// SyncPods synchronizes the configured list of pods (desired state) with the host current state.
func (kl *Kubelet) SyncPods(pods []api.BoundPod, podSyncTypes map[types.UID]metrics.SyncPodType, start time.Time) error {
func (kl *Kubelet) SyncPods(allPods []api.BoundPod, podSyncTypes map[types.UID]metrics.SyncPodType, start time.Time) error {
defer func() {
metrics.SyncPodsLatency.Observe(metrics.SinceInMicroseconds(start))
}()
// Remove obsolete entries in podStatus where the pod is no longer considered bound to this node.
podFullNames := make(map[string]bool)
for _, pod := range allPods {
podFullNames[GetPodFullName(&pod)] = true
}
kl.removeOrphanedStatuses(podFullNames)
// Filtered out the rejected pod. They don't have running containers.
var pods []api.BoundPod
for _, pod := range allPods {
status, ok := kl.getPodStatusFromCache(GetPodFullName(&pod))
if ok && status.Phase == api.PodFailed {
continue
}
pods = append(pods, pod)
}
glog.V(4).Infof("Desired: %#v", pods)
var err error
desiredContainers := make(map[podContainer]empty)
@ -1436,9 +1484,9 @@ func (s podsByCreationTime) Less(i, j int) bool {
return s[i].CreationTimestamp.Before(s[j].CreationTimestamp)
}
// filterHostPortConflicts removes pods that conflict on Port.HostPort values
func (kl *Kubelet) filterHostPortConflicts(pods []api.BoundPod) []api.BoundPod {
filtered := []api.BoundPod{}
// getHostPortConflicts detects pods with conflicted host ports and return them.
func getHostPortConflicts(pods []api.BoundPod) []api.BoundPod {
conflicts := []api.BoundPod{}
ports := map[int]bool{}
extract := func(p *api.ContainerPort) int { return p.HostPort }
@ -1448,15 +1496,24 @@ func (kl *Kubelet) filterHostPortConflicts(pods []api.BoundPod) []api.BoundPod {
for i := range pods {
pod := &pods[i]
if errs := validation.AccumulateUniquePorts(pod.Spec.Containers, ports, extract); len(errs) != 0 {
glog.Warningf("Pod %q: HostPort is already allocated, ignoring: %v", GetPodFullName(pod), errs)
kl.recorder.Eventf(pod, "hostPortConflict", "Cannot start the pod due to host port conflict.")
// TODO: Set the pod status to fail.
glog.Errorf("Pod %q: HostPort is already allocated, ignoring: %v", GetPodFullName(pod), errs)
conflicts = append(conflicts, *pod)
continue
}
filtered = append(filtered, *pod)
}
return filtered
return conflicts
}
// handleHostPortConflicts handles pods that conflict on Port.HostPort values.
func (kl *Kubelet) handleHostPortConflicts(pods []api.BoundPod) {
conflicts := getHostPortConflicts(pods)
for _, pod := range conflicts {
kl.recorder.Eventf(&pod, "hostPortConflict", "Cannot start the pod due to host port conflict.")
kl.setPodStatusInCache(GetPodFullName(&pod), api.PodStatus{
Phase: api.PodFailed,
Message: "Pod cannot be started due to host port conflict"})
}
}
func (kl *Kubelet) handleUpdate(u PodUpdate) {
@ -1466,12 +1523,11 @@ func (kl *Kubelet) handleUpdate(u PodUpdate) {
case SET:
glog.V(3).Infof("SET: Containers changed")
kl.pods = u.Pods
kl.pods = kl.filterHostPortConflicts(kl.pods)
kl.handleHostPortConflicts(kl.pods)
case UPDATE:
glog.V(3).Infof("Update: Containers changed")
kl.pods = updateBoundPods(u.Pods, kl.pods)
kl.pods = kl.filterHostPortConflicts(kl.pods)
kl.handleHostPortConflicts(kl.pods)
default:
panic("syncLoop does not support incremental changes")
}
@ -1537,7 +1593,7 @@ func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.Sy
}
kl.pods = u.Pods
kl.pods = kl.filterHostPortConflicts(kl.pods)
kl.handleHostPortConflicts(kl.pods)
case UPDATE:
glog.V(3).Infof("Update: Containers changed")
@ -1546,9 +1602,8 @@ func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.Sy
for i := range u.Pods {
podSyncTypes[u.Pods[i].UID] = metrics.SyncPodUpdate
}
kl.pods = updateBoundPods(u.Pods, kl.pods)
kl.pods = kl.filterHostPortConflicts(kl.pods)
kl.handleHostPortConflicts(kl.pods)
default:
panic("syncLoop does not support incremental changes")
}
@ -1746,6 +1801,12 @@ func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatu
return podStatus, fmt.Errorf("Couldn't find spec for pod %s", podFullName)
}
// Check to see if the pod has been rejected.
mappedPodStatus, ok := kl.getPodStatusFromCache(podFullName)
if ok {
return mappedPodStatus, nil
}
info, err := dockertools.GetDockerPodInfo(kl.dockerClient, *spec, podFullName, uid)
if err != nil {

View File

@ -83,6 +83,7 @@ func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient, *syn
kubelet.serviceLister = testServiceLister{}
kubelet.readiness = newReadinessStates()
kubelet.recorder = recorder
kubelet.podStatuses = map[string]api.PodStatus{}
if err := kubelet.setupDataDirs(); err != nil {
t.Fatalf("can't initialize kubelet data dirs: %v", err)
}
@ -1204,35 +1205,6 @@ func TestMakePortsAndBindings(t *testing.T) {
}
}
func TestCheckHostPortConflicts(t *testing.T) {
kubelet, _, _ := newTestKubelet(t)
successCaseAll := []api.BoundPod{
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 82}}}}}},
}
successCaseNew := api.BoundPod{
Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 83}}}}},
}
expected := append(successCaseAll, successCaseNew)
if actual := kubelet.filterHostPortConflicts(expected); !reflect.DeepEqual(actual, expected) {
t.Errorf("Expected %#v, Got %#v", expected, actual)
}
failureCaseAll := []api.BoundPod{
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 82}}}}}},
}
failureCaseNew := api.BoundPod{
Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}},
}
if actual := kubelet.filterHostPortConflicts(append(failureCaseAll, failureCaseNew)); !reflect.DeepEqual(failureCaseAll, actual) {
t.Errorf("Expected %#v, Got %#v", expected, actual)
}
}
func TestFieldPath(t *testing.T) {
pod := &api.BoundPod{Spec: api.PodSpec{Containers: []api.Container{
{Name: "foo"},
@ -3088,13 +3060,35 @@ func TestPortForward(t *testing.T) {
}
}
// Tests that upon host port conflict, the newer pod is removed.
func TestFilterHostPortConflicts(t *testing.T) {
kubelet, _, _ := newTestKubelet(t)
// Tests that identify the host port conflicts are detected correctly.
func TestGetHostPortConflicts(t *testing.T) {
pods := []api.BoundPod{
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 82}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 83}}}}}},
}
// Pods should not cause any conflict.
conflicts := getHostPortConflicts(pods)
if len(conflicts) != 0 {
t.Errorf("expected no conflicts, Got %#v", conflicts)
}
// Reuse the pod spec with the same port to create a conflict.
// The new pod should cause conflict and be reported.
expected := api.BoundPod{
Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}},
}
pods = append(pods, expected)
if actual := getHostPortConflicts(pods); !reflect.DeepEqual(actual, []api.BoundPod{expected}) {
t.Errorf("expected %#v, Got %#v", expected, actual)
}
}
// Tests that we handle port conflicts correctly by setting the failed status in status map.
func TestHandlePortConflicts(t *testing.T) {
kl, _, _ := newTestKubelet(t)
spec := api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}
var pods = []api.BoundPod{
pods := []api.BoundPod{
{
ObjectMeta: api.ObjectMeta{
UID: "123456789",
@ -3115,12 +3109,48 @@ func TestFilterHostPortConflicts(t *testing.T) {
// Make sure the BoundPods are in the reverse order of creation time.
pods[1].CreationTimestamp = util.NewTime(time.Now())
pods[0].CreationTimestamp = util.NewTime(time.Now().Add(1 * time.Second))
filteredPods := kubelet.filterHostPortConflicts(pods)
if len(filteredPods) != 1 {
t.Fatalf("Expected one pod. Got pods %#v", filteredPods)
// The newer pod should be rejected.
conflictedPodName := GetPodFullName(&pods[0])
kl.handleHostPortConflicts(pods)
if len(kl.podStatuses) != 1 {
t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses)
}
if filteredPods[0].Name != "oldpod" {
t.Fatalf("Expected pod %#v. Got pod %#v", pods[1], filteredPods[0])
// Check pod status stored in the status map.
status, ok := kl.podStatuses[conflictedPodName]
if !ok {
t.Fatalf("status of pod %q is not found in the status map.", conflictedPodName)
}
if status.Phase != api.PodFailed {
t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase)
}
// Check if we can retrieve the pod status from GetPodStatus().
kl.pods = pods
status, err := kl.GetPodStatus(conflictedPodName, "")
if err != nil {
t.Fatalf("unable to retrieve pod status for pod %q: #v.", conflictedPodName, err)
}
if status.Phase != api.PodFailed {
t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase)
}
}
func TestPurgingObsoleteStatusMapEntries(t *testing.T) {
kl, _, _ := newTestKubelet(t)
pods := []api.BoundPod{
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}},
}
// Run once to populate the status map.
kl.handleHostPortConflicts(pods)
if len(kl.podStatuses) != 1 {
t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses)
}
// Sync with empty pods so that the entry in status map will be removed.
kl.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now())
if len(kl.podStatuses) != 0 {
t.Fatalf("expected length of status map to be 0. Got map %#v.", kl.podStatuses)
}
}

View File

@ -55,7 +55,7 @@ func (kl *Kubelet) runOnce(pods []api.BoundPod) (results []RunPodResult, err err
if kl.dockerPuller == nil {
kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient, kl.pullQPS, kl.pullBurst)
}
pods = kl.filterHostPortConflicts(pods)
kl.handleHostPortConflicts(pods)
ch := make(chan RunPodResult)
for i := range pods {

View File

@ -213,16 +213,8 @@ func (p *PodCache) computePodStatus(pod *api.Pod) (api.PodStatus, error) {
newStatus.HostIP = p.getHostAddress(nodeStatus.Addresses)
newStatus.Info = result.Status.Info
newStatus.PodIP = result.Status.PodIP
if newStatus.Info == nil {
// There is a small race window that kubelet couldn't
// propulated the status yet. This should go away once
// we removed boundPods
newStatus.Phase = api.PodPending
newStatus.Conditions = append(newStatus.Conditions, pod.Status.Conditions...)
} else {
newStatus.Phase = result.Status.Phase
newStatus.Conditions = result.Status.Conditions
}
newStatus.Phase = result.Status.Phase
newStatus.Conditions = result.Status.Conditions
}
return newStatus, err
}