From 3ccdb8db987c9d3870463f01f9c32bfed51b0d9b Mon Sep 17 00:00:00 2001 From: Yu-Ju Hong Date: Tue, 3 Mar 2015 10:33:25 -0800 Subject: [PATCH] kubelet: reject pods on host port conflict When a host port conflict is detected, kubelet should set the pod status to fail. The failed status will then be polled by other components at a later time, which allows replication controller to create a new pod if necessary. To achieve this, this change stores the pod status information in a status map upon the detecton of port conflict. GetPodStatus() consults this status map before attempting to query docker. The entries in the status map will be removed when the pod is no longer associated with the node. --- pkg/kubelet/kubelet.go | 91 +++++++++++++++++++++++++----- pkg/kubelet/kubelet_test.go | 108 +++++++++++++++++++++++------------- pkg/kubelet/runonce.go | 2 +- pkg/master/pod_cache.go | 12 +--- 4 files changed, 148 insertions(+), 65 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 3091817a4dd..caa2b878a15 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -155,6 +155,8 @@ func NewMainKubelet( return nil, err } + klet.podStatuses = make(map[string]api.PodStatus) + return klet, nil } @@ -235,6 +237,10 @@ type Kubelet struct { // the EventRecorder to use recorder record.EventRecorder + + // A pod status cache currently used to store rejected pods and their statuses. + podStatusesLock sync.RWMutex + podStatuses map[string]api.PodStatus } // getRootDir returns the full path to the directory under which kubelet can @@ -456,6 +462,30 @@ func (kl *Kubelet) GetCadvisorClient() cadvisorInterface { return kl.cadvisorClient } +func (kl *Kubelet) getPodStatusFromCache(podFullName string) (api.PodStatus, bool) { + kl.podStatusesLock.RLock() + defer kl.podStatusesLock.RUnlock() + status, ok := kl.podStatuses[podFullName] + return status, ok +} + +func (kl *Kubelet) setPodStatusInCache(podFullName string, status api.PodStatus) { + kl.podStatusesLock.Lock() + defer kl.podStatusesLock.Unlock() + kl.podStatuses[podFullName] = status +} + +func (kl *Kubelet) removeOrphanedStatuses(podFullNames map[string]bool) { + kl.podStatusesLock.Lock() + defer kl.podStatusesLock.Unlock() + for key := range kl.podStatuses { + if _, ok := podFullNames[key]; !ok { + glog.V(5).Infof("Removing %q from status map.", key) + delete(kl.podStatuses, key) + } + } +} + // Run starts the kubelet reacting to config updates func (kl *Kubelet) Run(updates <-chan PodUpdate) { if kl.logServer == nil { @@ -1277,10 +1307,28 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []api.BoundPod, running []*docker } // SyncPods synchronizes the configured list of pods (desired state) with the host current state. -func (kl *Kubelet) SyncPods(pods []api.BoundPod, podSyncTypes map[types.UID]metrics.SyncPodType, start time.Time) error { +func (kl *Kubelet) SyncPods(allPods []api.BoundPod, podSyncTypes map[types.UID]metrics.SyncPodType, start time.Time) error { defer func() { metrics.SyncPodsLatency.Observe(metrics.SinceInMicroseconds(start)) }() + + // Remove obsolete entries in podStatus where the pod is no longer considered bound to this node. + podFullNames := make(map[string]bool) + for _, pod := range allPods { + podFullNames[GetPodFullName(&pod)] = true + } + kl.removeOrphanedStatuses(podFullNames) + + // Filtered out the rejected pod. They don't have running containers. + var pods []api.BoundPod + for _, pod := range allPods { + status, ok := kl.getPodStatusFromCache(GetPodFullName(&pod)) + if ok && status.Phase == api.PodFailed { + continue + } + pods = append(pods, pod) + } + glog.V(4).Infof("Desired: %#v", pods) var err error desiredContainers := make(map[podContainer]empty) @@ -1404,9 +1452,9 @@ func (s podsByCreationTime) Less(i, j int) bool { return s[i].CreationTimestamp.Before(s[j].CreationTimestamp) } -// filterHostPortConflicts removes pods that conflict on Port.HostPort values -func (kl *Kubelet) filterHostPortConflicts(pods []api.BoundPod) []api.BoundPod { - filtered := []api.BoundPod{} +// getHostPortConflicts detects pods with conflicted host ports and return them. +func getHostPortConflicts(pods []api.BoundPod) []api.BoundPod { + conflicts := []api.BoundPod{} ports := map[int]bool{} extract := func(p *api.ContainerPort) int { return p.HostPort } @@ -1416,15 +1464,24 @@ func (kl *Kubelet) filterHostPortConflicts(pods []api.BoundPod) []api.BoundPod { for i := range pods { pod := &pods[i] if errs := validation.AccumulateUniquePorts(pod.Spec.Containers, ports, extract); len(errs) != 0 { - glog.Warningf("Pod %q: HostPort is already allocated, ignoring: %v", GetPodFullName(pod), errs) - kl.recorder.Eventf(pod, "hostPortConflict", "Cannot start the pod due to host port conflict.") - // TODO: Set the pod status to fail. + glog.Errorf("Pod %q: HostPort is already allocated, ignoring: %v", GetPodFullName(pod), errs) + conflicts = append(conflicts, *pod) continue } - filtered = append(filtered, *pod) } - return filtered + return conflicts +} + +// handleHostPortConflicts handles pods that conflict on Port.HostPort values. +func (kl *Kubelet) handleHostPortConflicts(pods []api.BoundPod) { + conflicts := getHostPortConflicts(pods) + for _, pod := range conflicts { + kl.recorder.Eventf(&pod, "hostPortConflict", "Cannot start the pod due to host port conflict.") + kl.setPodStatusInCache(GetPodFullName(&pod), api.PodStatus{ + Phase: api.PodFailed, + Message: "Pod cannot be started due to host port conflict"}) + } } func (kl *Kubelet) handleUpdate(u PodUpdate) { @@ -1434,12 +1491,11 @@ func (kl *Kubelet) handleUpdate(u PodUpdate) { case SET: glog.V(3).Infof("SET: Containers changed") kl.pods = u.Pods - kl.pods = kl.filterHostPortConflicts(kl.pods) + kl.handleHostPortConflicts(kl.pods) case UPDATE: glog.V(3).Infof("Update: Containers changed") kl.pods = updateBoundPods(u.Pods, kl.pods) - kl.pods = kl.filterHostPortConflicts(kl.pods) - + kl.handleHostPortConflicts(kl.pods) default: panic("syncLoop does not support incremental changes") } @@ -1505,7 +1561,7 @@ func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.Sy } kl.pods = u.Pods - kl.pods = kl.filterHostPortConflicts(kl.pods) + kl.handleHostPortConflicts(kl.pods) case UPDATE: glog.V(3).Infof("Update: Containers changed") @@ -1514,9 +1570,8 @@ func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.Sy for i := range u.Pods { podSyncTypes[u.Pods[i].UID] = metrics.SyncPodUpdate } - kl.pods = updateBoundPods(u.Pods, kl.pods) - kl.pods = kl.filterHostPortConflicts(kl.pods) + kl.handleHostPortConflicts(kl.pods) default: panic("syncLoop does not support incremental changes") } @@ -1714,6 +1769,12 @@ func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatu return podStatus, fmt.Errorf("Couldn't find spec for pod %s", podFullName) } + // Check to see if the pod has been rejected. + mappedPodStatus, ok := kl.getPodStatusFromCache(podFullName) + if ok { + return mappedPodStatus, nil + } + info, err := dockertools.GetDockerPodInfo(kl.dockerClient, *spec, podFullName, uid) if err != nil { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 64d66276a47..1098b40569b 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -83,6 +83,7 @@ func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient, *syn kubelet.serviceLister = testServiceLister{} kubelet.readiness = newReadinessStates() kubelet.recorder = recorder + kubelet.podStatuses = map[string]api.PodStatus{} if err := kubelet.setupDataDirs(); err != nil { t.Fatalf("can't initialize kubelet data dirs: %v", err) } @@ -1204,35 +1205,6 @@ func TestMakePortsAndBindings(t *testing.T) { } } -func TestCheckHostPortConflicts(t *testing.T) { - kubelet, _, _ := newTestKubelet(t) - - successCaseAll := []api.BoundPod{ - {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}}, - {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}}}, - {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 82}}}}}}, - } - successCaseNew := api.BoundPod{ - Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 83}}}}}, - } - expected := append(successCaseAll, successCaseNew) - if actual := kubelet.filterHostPortConflicts(expected); !reflect.DeepEqual(actual, expected) { - t.Errorf("Expected %#v, Got %#v", expected, actual) - } - - failureCaseAll := []api.BoundPod{ - {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}}, - {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}}}, - {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 82}}}}}}, - } - failureCaseNew := api.BoundPod{ - Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}}, - } - if actual := kubelet.filterHostPortConflicts(append(failureCaseAll, failureCaseNew)); !reflect.DeepEqual(failureCaseAll, actual) { - t.Errorf("Expected %#v, Got %#v", expected, actual) - } -} - func TestFieldPath(t *testing.T) { pod := &api.BoundPod{Spec: api.PodSpec{Containers: []api.Container{ {Name: "foo"}, @@ -3088,13 +3060,35 @@ func TestPortForward(t *testing.T) { } } -// Tests that upon host port conflict, the newer pod is removed. -func TestFilterHostPortConflicts(t *testing.T) { - kubelet, _, _ := newTestKubelet(t) +// Tests that identify the host port conflicts are detected correctly. +func TestGetHostPortConflicts(t *testing.T) { + pods := []api.BoundPod{ + {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}}, + {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}}}, + {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 82}}}}}}, + {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 83}}}}}}, + } + // Pods should not cause any conflict. + conflicts := getHostPortConflicts(pods) + if len(conflicts) != 0 { + t.Errorf("expected no conflicts, Got %#v", conflicts) + } - // Reuse the pod spec with the same port to create a conflict. + // The new pod should cause conflict and be reported. + expected := api.BoundPod{ + Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}}, + } + pods = append(pods, expected) + if actual := getHostPortConflicts(pods); !reflect.DeepEqual(actual, []api.BoundPod{expected}) { + t.Errorf("expected %#v, Got %#v", expected, actual) + } +} + +// Tests that we handle port conflicts correctly by setting the failed status in status map. +func TestHandlePortConflicts(t *testing.T) { + kl, _, _ := newTestKubelet(t) spec := api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}} - var pods = []api.BoundPod{ + pods := []api.BoundPod{ { ObjectMeta: api.ObjectMeta{ UID: "123456789", @@ -3115,12 +3109,48 @@ func TestFilterHostPortConflicts(t *testing.T) { // Make sure the BoundPods are in the reverse order of creation time. pods[1].CreationTimestamp = util.NewTime(time.Now()) pods[0].CreationTimestamp = util.NewTime(time.Now().Add(1 * time.Second)) - filteredPods := kubelet.filterHostPortConflicts(pods) - if len(filteredPods) != 1 { - t.Fatalf("Expected one pod. Got pods %#v", filteredPods) + // The newer pod should be rejected. + conflictedPodName := GetPodFullName(&pods[0]) + + kl.handleHostPortConflicts(pods) + if len(kl.podStatuses) != 1 { + t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses) } - if filteredPods[0].Name != "oldpod" { - t.Fatalf("Expected pod %#v. Got pod %#v", pods[1], filteredPods[0]) + // Check pod status stored in the status map. + status, ok := kl.podStatuses[conflictedPodName] + if !ok { + t.Fatalf("status of pod %q is not found in the status map.", conflictedPodName) + } + if status.Phase != api.PodFailed { + t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase) + } + + // Check if we can retrieve the pod status from GetPodStatus(). + kl.pods = pods + status, err := kl.GetPodStatus(conflictedPodName, "") + if err != nil { + t.Fatalf("unable to retrieve pod status for pod %q: #v.", conflictedPodName, err) + } + if status.Phase != api.PodFailed { + t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase) + } +} + +func TestPurgingObsoleteStatusMapEntries(t *testing.T) { + kl, _, _ := newTestKubelet(t) + pods := []api.BoundPod{ + {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}}, + {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}}, + } + // Run once to populate the status map. + kl.handleHostPortConflicts(pods) + if len(kl.podStatuses) != 1 { + t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses) + } + // Sync with empty pods so that the entry in status map will be removed. + kl.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now()) + if len(kl.podStatuses) != 0 { + t.Fatalf("expected length of status map to be 0. Got map %#v.", kl.podStatuses) } } diff --git a/pkg/kubelet/runonce.go b/pkg/kubelet/runonce.go index d897819cebc..6389b36f4a2 100644 --- a/pkg/kubelet/runonce.go +++ b/pkg/kubelet/runonce.go @@ -55,7 +55,7 @@ func (kl *Kubelet) runOnce(pods []api.BoundPod) (results []RunPodResult, err err if kl.dockerPuller == nil { kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient, kl.pullQPS, kl.pullBurst) } - pods = kl.filterHostPortConflicts(pods) + kl.handleHostPortConflicts(pods) ch := make(chan RunPodResult) for i := range pods { diff --git a/pkg/master/pod_cache.go b/pkg/master/pod_cache.go index ab95f3a2d04..b835bb12099 100644 --- a/pkg/master/pod_cache.go +++ b/pkg/master/pod_cache.go @@ -213,16 +213,8 @@ func (p *PodCache) computePodStatus(pod *api.Pod) (api.PodStatus, error) { newStatus.HostIP = p.getHostAddress(nodeStatus.Addresses) newStatus.Info = result.Status.Info newStatus.PodIP = result.Status.PodIP - if newStatus.Info == nil { - // There is a small race window that kubelet couldn't - // propulated the status yet. This should go away once - // we removed boundPods - newStatus.Phase = api.PodPending - newStatus.Conditions = append(newStatus.Conditions, pod.Status.Conditions...) - } else { - newStatus.Phase = result.Status.Phase - newStatus.Conditions = result.Status.Conditions - } + newStatus.Phase = result.Status.Phase + newStatus.Conditions = result.Status.Conditions } return newStatus, err }