kubelet: reject pods on host port conflict

When a host port conflict is detected, kubelet should set the pod status to
fail. The failed status will then be polled by other components at a later time,
which allows replication controller to create a new pod if necessary.

To achieve this, this change stores the pod status information in a status map
upon the detecton of port conflict. GetPodStatus() consults this status map
before attempting to query docker. The entries in the status map will be removed
when the pod is no longer associated with the node.
This commit is contained in:
Yu-Ju Hong 2015-03-03 10:33:25 -08:00
parent ca265c5705
commit 3ccdb8db98
4 changed files with 148 additions and 65 deletions

View File

@ -155,6 +155,8 @@ func NewMainKubelet(
return nil, err
}
klet.podStatuses = make(map[string]api.PodStatus)
return klet, nil
}
@ -235,6 +237,10 @@ type Kubelet struct {
// the EventRecorder to use
recorder record.EventRecorder
// A pod status cache currently used to store rejected pods and their statuses.
podStatusesLock sync.RWMutex
podStatuses map[string]api.PodStatus
}
// getRootDir returns the full path to the directory under which kubelet can
@ -456,6 +462,30 @@ func (kl *Kubelet) GetCadvisorClient() cadvisorInterface {
return kl.cadvisorClient
}
func (kl *Kubelet) getPodStatusFromCache(podFullName string) (api.PodStatus, bool) {
kl.podStatusesLock.RLock()
defer kl.podStatusesLock.RUnlock()
status, ok := kl.podStatuses[podFullName]
return status, ok
}
func (kl *Kubelet) setPodStatusInCache(podFullName string, status api.PodStatus) {
kl.podStatusesLock.Lock()
defer kl.podStatusesLock.Unlock()
kl.podStatuses[podFullName] = status
}
func (kl *Kubelet) removeOrphanedStatuses(podFullNames map[string]bool) {
kl.podStatusesLock.Lock()
defer kl.podStatusesLock.Unlock()
for key := range kl.podStatuses {
if _, ok := podFullNames[key]; !ok {
glog.V(5).Infof("Removing %q from status map.", key)
delete(kl.podStatuses, key)
}
}
}
// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan PodUpdate) {
if kl.logServer == nil {
@ -1277,10 +1307,28 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []api.BoundPod, running []*docker
}
// SyncPods synchronizes the configured list of pods (desired state) with the host current state.
func (kl *Kubelet) SyncPods(pods []api.BoundPod, podSyncTypes map[types.UID]metrics.SyncPodType, start time.Time) error {
func (kl *Kubelet) SyncPods(allPods []api.BoundPod, podSyncTypes map[types.UID]metrics.SyncPodType, start time.Time) error {
defer func() {
metrics.SyncPodsLatency.Observe(metrics.SinceInMicroseconds(start))
}()
// Remove obsolete entries in podStatus where the pod is no longer considered bound to this node.
podFullNames := make(map[string]bool)
for _, pod := range allPods {
podFullNames[GetPodFullName(&pod)] = true
}
kl.removeOrphanedStatuses(podFullNames)
// Filtered out the rejected pod. They don't have running containers.
var pods []api.BoundPod
for _, pod := range allPods {
status, ok := kl.getPodStatusFromCache(GetPodFullName(&pod))
if ok && status.Phase == api.PodFailed {
continue
}
pods = append(pods, pod)
}
glog.V(4).Infof("Desired: %#v", pods)
var err error
desiredContainers := make(map[podContainer]empty)
@ -1404,9 +1452,9 @@ func (s podsByCreationTime) Less(i, j int) bool {
return s[i].CreationTimestamp.Before(s[j].CreationTimestamp)
}
// filterHostPortConflicts removes pods that conflict on Port.HostPort values
func (kl *Kubelet) filterHostPortConflicts(pods []api.BoundPod) []api.BoundPod {
filtered := []api.BoundPod{}
// getHostPortConflicts detects pods with conflicted host ports and return them.
func getHostPortConflicts(pods []api.BoundPod) []api.BoundPod {
conflicts := []api.BoundPod{}
ports := map[int]bool{}
extract := func(p *api.ContainerPort) int { return p.HostPort }
@ -1416,15 +1464,24 @@ func (kl *Kubelet) filterHostPortConflicts(pods []api.BoundPod) []api.BoundPod {
for i := range pods {
pod := &pods[i]
if errs := validation.AccumulateUniquePorts(pod.Spec.Containers, ports, extract); len(errs) != 0 {
glog.Warningf("Pod %q: HostPort is already allocated, ignoring: %v", GetPodFullName(pod), errs)
kl.recorder.Eventf(pod, "hostPortConflict", "Cannot start the pod due to host port conflict.")
// TODO: Set the pod status to fail.
glog.Errorf("Pod %q: HostPort is already allocated, ignoring: %v", GetPodFullName(pod), errs)
conflicts = append(conflicts, *pod)
continue
}
filtered = append(filtered, *pod)
}
return filtered
return conflicts
}
// handleHostPortConflicts handles pods that conflict on Port.HostPort values.
func (kl *Kubelet) handleHostPortConflicts(pods []api.BoundPod) {
conflicts := getHostPortConflicts(pods)
for _, pod := range conflicts {
kl.recorder.Eventf(&pod, "hostPortConflict", "Cannot start the pod due to host port conflict.")
kl.setPodStatusInCache(GetPodFullName(&pod), api.PodStatus{
Phase: api.PodFailed,
Message: "Pod cannot be started due to host port conflict"})
}
}
func (kl *Kubelet) handleUpdate(u PodUpdate) {
@ -1434,12 +1491,11 @@ func (kl *Kubelet) handleUpdate(u PodUpdate) {
case SET:
glog.V(3).Infof("SET: Containers changed")
kl.pods = u.Pods
kl.pods = kl.filterHostPortConflicts(kl.pods)
kl.handleHostPortConflicts(kl.pods)
case UPDATE:
glog.V(3).Infof("Update: Containers changed")
kl.pods = updateBoundPods(u.Pods, kl.pods)
kl.pods = kl.filterHostPortConflicts(kl.pods)
kl.handleHostPortConflicts(kl.pods)
default:
panic("syncLoop does not support incremental changes")
}
@ -1505,7 +1561,7 @@ func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.Sy
}
kl.pods = u.Pods
kl.pods = kl.filterHostPortConflicts(kl.pods)
kl.handleHostPortConflicts(kl.pods)
case UPDATE:
glog.V(3).Infof("Update: Containers changed")
@ -1514,9 +1570,8 @@ func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.Sy
for i := range u.Pods {
podSyncTypes[u.Pods[i].UID] = metrics.SyncPodUpdate
}
kl.pods = updateBoundPods(u.Pods, kl.pods)
kl.pods = kl.filterHostPortConflicts(kl.pods)
kl.handleHostPortConflicts(kl.pods)
default:
panic("syncLoop does not support incremental changes")
}
@ -1714,6 +1769,12 @@ func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatu
return podStatus, fmt.Errorf("Couldn't find spec for pod %s", podFullName)
}
// Check to see if the pod has been rejected.
mappedPodStatus, ok := kl.getPodStatusFromCache(podFullName)
if ok {
return mappedPodStatus, nil
}
info, err := dockertools.GetDockerPodInfo(kl.dockerClient, *spec, podFullName, uid)
if err != nil {

View File

@ -83,6 +83,7 @@ func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient, *syn
kubelet.serviceLister = testServiceLister{}
kubelet.readiness = newReadinessStates()
kubelet.recorder = recorder
kubelet.podStatuses = map[string]api.PodStatus{}
if err := kubelet.setupDataDirs(); err != nil {
t.Fatalf("can't initialize kubelet data dirs: %v", err)
}
@ -1204,35 +1205,6 @@ func TestMakePortsAndBindings(t *testing.T) {
}
}
func TestCheckHostPortConflicts(t *testing.T) {
kubelet, _, _ := newTestKubelet(t)
successCaseAll := []api.BoundPod{
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 82}}}}}},
}
successCaseNew := api.BoundPod{
Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 83}}}}},
}
expected := append(successCaseAll, successCaseNew)
if actual := kubelet.filterHostPortConflicts(expected); !reflect.DeepEqual(actual, expected) {
t.Errorf("Expected %#v, Got %#v", expected, actual)
}
failureCaseAll := []api.BoundPod{
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 82}}}}}},
}
failureCaseNew := api.BoundPod{
Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}},
}
if actual := kubelet.filterHostPortConflicts(append(failureCaseAll, failureCaseNew)); !reflect.DeepEqual(failureCaseAll, actual) {
t.Errorf("Expected %#v, Got %#v", expected, actual)
}
}
func TestFieldPath(t *testing.T) {
pod := &api.BoundPod{Spec: api.PodSpec{Containers: []api.Container{
{Name: "foo"},
@ -3088,13 +3060,35 @@ func TestPortForward(t *testing.T) {
}
}
// Tests that upon host port conflict, the newer pod is removed.
func TestFilterHostPortConflicts(t *testing.T) {
kubelet, _, _ := newTestKubelet(t)
// Tests that identify the host port conflicts are detected correctly.
func TestGetHostPortConflicts(t *testing.T) {
pods := []api.BoundPod{
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 82}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 83}}}}}},
}
// Pods should not cause any conflict.
conflicts := getHostPortConflicts(pods)
if len(conflicts) != 0 {
t.Errorf("expected no conflicts, Got %#v", conflicts)
}
// Reuse the pod spec with the same port to create a conflict.
// The new pod should cause conflict and be reported.
expected := api.BoundPod{
Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}},
}
pods = append(pods, expected)
if actual := getHostPortConflicts(pods); !reflect.DeepEqual(actual, []api.BoundPod{expected}) {
t.Errorf("expected %#v, Got %#v", expected, actual)
}
}
// Tests that we handle port conflicts correctly by setting the failed status in status map.
func TestHandlePortConflicts(t *testing.T) {
kl, _, _ := newTestKubelet(t)
spec := api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}
var pods = []api.BoundPod{
pods := []api.BoundPod{
{
ObjectMeta: api.ObjectMeta{
UID: "123456789",
@ -3115,12 +3109,48 @@ func TestFilterHostPortConflicts(t *testing.T) {
// Make sure the BoundPods are in the reverse order of creation time.
pods[1].CreationTimestamp = util.NewTime(time.Now())
pods[0].CreationTimestamp = util.NewTime(time.Now().Add(1 * time.Second))
filteredPods := kubelet.filterHostPortConflicts(pods)
if len(filteredPods) != 1 {
t.Fatalf("Expected one pod. Got pods %#v", filteredPods)
// The newer pod should be rejected.
conflictedPodName := GetPodFullName(&pods[0])
kl.handleHostPortConflicts(pods)
if len(kl.podStatuses) != 1 {
t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses)
}
if filteredPods[0].Name != "oldpod" {
t.Fatalf("Expected pod %#v. Got pod %#v", pods[1], filteredPods[0])
// Check pod status stored in the status map.
status, ok := kl.podStatuses[conflictedPodName]
if !ok {
t.Fatalf("status of pod %q is not found in the status map.", conflictedPodName)
}
if status.Phase != api.PodFailed {
t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase)
}
// Check if we can retrieve the pod status from GetPodStatus().
kl.pods = pods
status, err := kl.GetPodStatus(conflictedPodName, "")
if err != nil {
t.Fatalf("unable to retrieve pod status for pod %q: #v.", conflictedPodName, err)
}
if status.Phase != api.PodFailed {
t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase)
}
}
func TestPurgingObsoleteStatusMapEntries(t *testing.T) {
kl, _, _ := newTestKubelet(t)
pods := []api.BoundPod{
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}},
}
// Run once to populate the status map.
kl.handleHostPortConflicts(pods)
if len(kl.podStatuses) != 1 {
t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses)
}
// Sync with empty pods so that the entry in status map will be removed.
kl.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now())
if len(kl.podStatuses) != 0 {
t.Fatalf("expected length of status map to be 0. Got map %#v.", kl.podStatuses)
}
}

View File

@ -55,7 +55,7 @@ func (kl *Kubelet) runOnce(pods []api.BoundPod) (results []RunPodResult, err err
if kl.dockerPuller == nil {
kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient, kl.pullQPS, kl.pullBurst)
}
pods = kl.filterHostPortConflicts(pods)
kl.handleHostPortConflicts(pods)
ch := make(chan RunPodResult)
for i := range pods {

View File

@ -213,16 +213,8 @@ func (p *PodCache) computePodStatus(pod *api.Pod) (api.PodStatus, error) {
newStatus.HostIP = p.getHostAddress(nodeStatus.Addresses)
newStatus.Info = result.Status.Info
newStatus.PodIP = result.Status.PodIP
if newStatus.Info == nil {
// There is a small race window that kubelet couldn't
// propulated the status yet. This should go away once
// we removed boundPods
newStatus.Phase = api.PodPending
newStatus.Conditions = append(newStatus.Conditions, pod.Status.Conditions...)
} else {
newStatus.Phase = result.Status.Phase
newStatus.Conditions = result.Status.Conditions
}
newStatus.Phase = result.Status.Phase
newStatus.Conditions = result.Status.Conditions
}
return newStatus, err
}