mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 05:27:21 +00:00
Merge pull request #5715 from jszczepkowski/kubelet-nodesel
Running node selector predicate on kubelet.
This commit is contained in:
commit
e8f70428fb
@ -19,6 +19,9 @@ package client
|
|||||||
import (
|
import (
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||||
)
|
)
|
||||||
|
|
||||||
// FakeNodes implements MinionInterface. Meant to be embedded into a struct to get a default
|
// FakeNodes implements MinionInterface. Meant to be embedded into a struct to get a default
|
||||||
@ -56,3 +59,8 @@ func (c *FakeNodes) Update(minion *api.Node) (*api.Node, error) {
|
|||||||
c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "update-minion", Value: minion})
|
c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "update-minion", Value: minion})
|
||||||
return &api.Node{}, nil
|
return &api.Node{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *FakeNodes) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) {
|
||||||
|
c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "watch-minions", Value: resourceVersion})
|
||||||
|
return c.Fake.Watch, c.Fake.Err
|
||||||
|
}
|
||||||
|
@ -21,6 +21,9 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||||
)
|
)
|
||||||
|
|
||||||
type NodesInterface interface {
|
type NodesInterface interface {
|
||||||
@ -33,6 +36,7 @@ type NodeInterface interface {
|
|||||||
List() (*api.NodeList, error)
|
List() (*api.NodeList, error)
|
||||||
Delete(name string) error
|
Delete(name string) error
|
||||||
Update(*api.Node) (*api.Node, error)
|
Update(*api.Node) (*api.Node, error)
|
||||||
|
Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// nodes implements NodesInterface
|
// nodes implements NodesInterface
|
||||||
@ -94,3 +98,15 @@ func (c *nodes) Update(minion *api.Node) (*api.Node, error) {
|
|||||||
err := c.r.Put().Resource(c.resourceName()).Name(minion.Name).Body(minion).Do().Into(result)
|
err := c.r.Put().Resource(c.resourceName()).Name(minion.Name).Body(minion).Do().Into(result)
|
||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Watch returns a watch.Interface that watches the requested nodes.
|
||||||
|
func (c *nodes) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) {
|
||||||
|
return c.r.Get().
|
||||||
|
Prefix("watch").
|
||||||
|
Namespace(api.NamespaceAll).
|
||||||
|
Resource(c.resourceName()).
|
||||||
|
Param("resourceVersion", resourceVersion).
|
||||||
|
LabelsSelectorParam(api.LabelSelectorQueryParam(c.r.APIVersion()), label).
|
||||||
|
FieldsSelectorParam(api.FieldSelectorQueryParam(c.r.APIVersion()), field).
|
||||||
|
Watch()
|
||||||
|
}
|
||||||
|
@ -30,8 +30,11 @@ import (
|
|||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||||
fake_cloud "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake"
|
fake_cloud "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||||
)
|
)
|
||||||
|
|
||||||
// FakeNodeHandler is a fake implementation of NodesInterface and NodeInterface. It
|
// FakeNodeHandler is a fake implementation of NodesInterface and NodeInterface. It
|
||||||
@ -114,6 +117,10 @@ func (m *FakeNodeHandler) Update(node *api.Node) (*api.Node, error) {
|
|||||||
return node, nil
|
return node, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *FakeNodeHandler) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
// FakeKubeletClient is a fake implementation of KubeletClient.
|
// FakeKubeletClient is a fake implementation of KubeletClient.
|
||||||
type FakeKubeletClient struct {
|
type FakeKubeletClient struct {
|
||||||
Status probe.Result
|
Status probe.Result
|
||||||
|
@ -170,6 +170,27 @@ func NewMainKubelet(
|
|||||||
}
|
}
|
||||||
serviceLister := &cache.StoreToServiceLister{serviceStore}
|
serviceLister := &cache.StoreToServiceLister{serviceStore}
|
||||||
|
|
||||||
|
serviceStore = cache.NewStore(cache.MetaNamespaceKeyFunc)
|
||||||
|
if kubeClient != nil {
|
||||||
|
// TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather
|
||||||
|
// than an interface. There is no way to construct a list+watcher using resource name.
|
||||||
|
listWatch := &cache.ListWatch{
|
||||||
|
// TODO: currently, we are watching all nodes. To make it more efficient,
|
||||||
|
// we should be watching only a node with Name equal to kubelet's Hostname.
|
||||||
|
// To make it possible, we need to add field selector to ListFunc and WatchFunc,
|
||||||
|
// and selection by field needs to be implemented in WatchMinions function in pkg/registry/etcd.
|
||||||
|
ListFunc: func() (runtime.Object, error) {
|
||||||
|
return kubeClient.Nodes().List()
|
||||||
|
},
|
||||||
|
WatchFunc: func(resourceVersion string) (watch.Interface, error) {
|
||||||
|
return kubeClient.Nodes().Watch(
|
||||||
|
labels.Everything(), fields.Everything(), resourceVersion)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
cache.NewReflector(listWatch, &api.Service{}, serviceStore, 0).Run()
|
||||||
|
}
|
||||||
|
nodeLister := &cache.StoreToNodeLister{serviceStore}
|
||||||
|
|
||||||
containerGC, err := newContainerGC(dockerClient, containerGCPolicy)
|
containerGC, err := newContainerGC(dockerClient, containerGCPolicy)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -197,6 +218,7 @@ func NewMainKubelet(
|
|||||||
clusterDomain: clusterDomain,
|
clusterDomain: clusterDomain,
|
||||||
clusterDNS: clusterDNS,
|
clusterDNS: clusterDNS,
|
||||||
serviceLister: serviceLister,
|
serviceLister: serviceLister,
|
||||||
|
nodeLister: nodeLister,
|
||||||
masterServiceNamespace: masterServiceNamespace,
|
masterServiceNamespace: masterServiceNamespace,
|
||||||
prober: newProbeHolder(),
|
prober: newProbeHolder(),
|
||||||
readiness: newReadinessStates(),
|
readiness: newReadinessStates(),
|
||||||
@ -244,6 +266,11 @@ type serviceLister interface {
|
|||||||
List() (api.ServiceList, error)
|
List() (api.ServiceList, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type nodeLister interface {
|
||||||
|
List() (machines api.NodeList, err error)
|
||||||
|
GetNodeInfo(id string) (*api.Node, error)
|
||||||
|
}
|
||||||
|
|
||||||
// Kubelet is the main kubelet implementation.
|
// Kubelet is the main kubelet implementation.
|
||||||
type Kubelet struct {
|
type Kubelet struct {
|
||||||
hostname string
|
hostname string
|
||||||
@ -306,6 +333,7 @@ type Kubelet struct {
|
|||||||
|
|
||||||
masterServiceNamespace string
|
masterServiceNamespace string
|
||||||
serviceLister serviceLister
|
serviceLister serviceLister
|
||||||
|
nodeLister nodeLister
|
||||||
|
|
||||||
// Volume plugins.
|
// Volume plugins.
|
||||||
volumePluginMgr volume.VolumePluginMgr
|
volumePluginMgr volume.VolumePluginMgr
|
||||||
@ -477,6 +505,20 @@ func (kl *Kubelet) listPodsFromDisk() ([]types.UID, error) {
|
|||||||
return pods, nil
|
return pods, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (kl *Kubelet) GetNode() (*api.Node, error) {
|
||||||
|
l, err := kl.nodeLister.List()
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.New("cannot list nodes")
|
||||||
|
}
|
||||||
|
host := kl.GetHostname()
|
||||||
|
for _, n := range l.Items {
|
||||||
|
if n.Name == host {
|
||||||
|
return &n, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("node %v not found", host)
|
||||||
|
}
|
||||||
|
|
||||||
// Starts garbage collection theads.
|
// Starts garbage collection theads.
|
||||||
func (kl *Kubelet) StartGarbageCollection() {
|
func (kl *Kubelet) StartGarbageCollection() {
|
||||||
go util.Forever(func() {
|
go util.Forever(func() {
|
||||||
@ -1495,7 +1537,7 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric
|
|||||||
kl.removeOrphanedStatuses(podFullNames)
|
kl.removeOrphanedStatuses(podFullNames)
|
||||||
|
|
||||||
// Filter out the rejected pod. They don't have running containers.
|
// Filter out the rejected pod. They don't have running containers.
|
||||||
kl.handleNotfittingPods(allPods)
|
kl.handleNotFittingPods(allPods)
|
||||||
var pods []api.Pod
|
var pods []api.Pod
|
||||||
for _, pod := range allPods {
|
for _, pod := range allPods {
|
||||||
status, ok := kl.getPodStatusFromCache(GetPodFullName(&pod))
|
status, ok := kl.getPodStatusFromCache(GetPodFullName(&pod))
|
||||||
@ -1640,9 +1682,8 @@ func (s podsByCreationTime) Less(i, j int) bool {
|
|||||||
return s[i].CreationTimestamp.Before(s[j].CreationTimestamp)
|
return s[i].CreationTimestamp.Before(s[j].CreationTimestamp)
|
||||||
}
|
}
|
||||||
|
|
||||||
// getHostPortConflicts detects pods with conflicted host ports and return them.
|
// checkHostPortConflicts detects pods with conflicted host ports.
|
||||||
func getHostPortConflicts(pods []api.Pod) []api.Pod {
|
func checkHostPortConflicts(pods []api.Pod) (fitting []api.Pod, notFitting []api.Pod) {
|
||||||
conflicts := []api.Pod{}
|
|
||||||
ports := map[int]bool{}
|
ports := map[int]bool{}
|
||||||
extract := func(p *api.ContainerPort) int { return p.HostPort }
|
extract := func(p *api.ContainerPort) int { return p.HostPort }
|
||||||
|
|
||||||
@ -1653,48 +1694,65 @@ func getHostPortConflicts(pods []api.Pod) []api.Pod {
|
|||||||
pod := &pods[i]
|
pod := &pods[i]
|
||||||
if errs := validation.AccumulateUniquePorts(pod.Spec.Containers, ports, extract); len(errs) != 0 {
|
if errs := validation.AccumulateUniquePorts(pod.Spec.Containers, ports, extract); len(errs) != 0 {
|
||||||
glog.Errorf("Pod %q: HostPort is already allocated, ignoring: %v", GetPodFullName(pod), errs)
|
glog.Errorf("Pod %q: HostPort is already allocated, ignoring: %v", GetPodFullName(pod), errs)
|
||||||
conflicts = append(conflicts, *pod)
|
notFitting = append(notFitting, *pod)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
fitting = append(fitting, *pod)
|
||||||
}
|
}
|
||||||
|
return
|
||||||
return conflicts
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (kl *Kubelet) getPodsExceedingCapacity(pods []api.Pod) []api.Pod {
|
// checkCapacityExceeded detects pods that exceeds node's resources.
|
||||||
|
func (kl *Kubelet) checkCapacityExceeded(pods []api.Pod) (fitting []api.Pod, notFitting []api.Pod) {
|
||||||
info, err := kl.GetCachedMachineInfo()
|
info, err := kl.GetCachedMachineInfo()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Error("error getting machine info: %v", err)
|
glog.Error("error getting machine info: %v", err)
|
||||||
return []api.Pod{}
|
return pods, []api.Pod{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Respect the pod creation order when resolving conflicts.
|
// Respect the pod creation order when resolving conflicts.
|
||||||
sort.Sort(podsByCreationTime(pods))
|
sort.Sort(podsByCreationTime(pods))
|
||||||
|
|
||||||
capacity := CapacityFromMachineInfo(info)
|
capacity := CapacityFromMachineInfo(info)
|
||||||
return scheduler.GetPodsExceedingCapacity(pods, capacity)
|
return scheduler.CheckPodsExceedingCapacity(pods, capacity)
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkNodeSelectorMatching detects pods that do not match node's labels.
|
||||||
|
func (kl *Kubelet) checkNodeSelectorMatching(pods []api.Pod) (fitting []api.Pod, notFitting []api.Pod) {
|
||||||
|
node, err := kl.GetNode()
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("error getting node: %v", err)
|
||||||
|
return pods, []api.Pod{}
|
||||||
|
}
|
||||||
|
for _, pod := range pods {
|
||||||
|
if !scheduler.PodMatchesNodeLabels(&pod, node) {
|
||||||
|
notFitting = append(notFitting, pod)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
fitting = append(fitting, pod)
|
||||||
|
}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleNotfittingPods handles pods that do not fit on the node.
|
// handleNotfittingPods handles pods that do not fit on the node.
|
||||||
// Currently conflicts on Port.HostPort values and exceeding node capacity are handled.
|
// Currently conflicts on Port.HostPort values, matching node's labels and exceeding node's capacity are handled.
|
||||||
func (kl *Kubelet) handleNotfittingPods(pods []api.Pod) {
|
func (kl *Kubelet) handleNotFittingPods(pods []api.Pod) {
|
||||||
conflicts := getHostPortConflicts(pods)
|
fitting, notFitting := checkHostPortConflicts(pods)
|
||||||
conflictsMap := map[types.UID]bool{}
|
for _, pod := range notFitting {
|
||||||
for _, pod := range conflicts {
|
|
||||||
kl.recorder.Eventf(&pod, "hostPortConflict", "Cannot start the pod due to host port conflict.")
|
kl.recorder.Eventf(&pod, "hostPortConflict", "Cannot start the pod due to host port conflict.")
|
||||||
kl.setPodStatusInCache(GetPodFullName(&pod), api.PodStatus{
|
kl.setPodStatusInCache(GetPodFullName(&pod), api.PodStatus{
|
||||||
Phase: api.PodFailed,
|
Phase: api.PodFailed,
|
||||||
Message: "Pod cannot be started due to host port conflict"})
|
Message: "Pod cannot be started due to host port conflict"})
|
||||||
conflictsMap[pod.UID] = true
|
|
||||||
}
|
}
|
||||||
remainingPods := []api.Pod{}
|
fitting, notFitting = kl.checkNodeSelectorMatching(fitting)
|
||||||
for _, pod := range pods {
|
for _, pod := range notFitting {
|
||||||
if !conflictsMap[pod.UID] {
|
kl.recorder.Eventf(&pod, "nodeSelectorMismatching", "Cannot start the pod due to node selector mismatch.")
|
||||||
remainingPods = append(remainingPods, pod)
|
kl.setPodStatusInCache(GetPodFullName(&pod), api.PodStatus{
|
||||||
|
Phase: api.PodFailed,
|
||||||
|
Message: "Pod cannot be started due to node selector mismatch"})
|
||||||
}
|
}
|
||||||
}
|
fitting, notFitting = kl.checkCapacityExceeded(fitting)
|
||||||
conflicts = kl.getPodsExceedingCapacity(remainingPods)
|
for _, pod := range notFitting {
|
||||||
for _, pod := range conflicts {
|
|
||||||
kl.recorder.Eventf(&pod, "capacityExceeded", "Cannot start the pod due to exceeded capacity.")
|
kl.recorder.Eventf(&pod, "capacityExceeded", "Cannot start the pod due to exceeded capacity.")
|
||||||
kl.setPodStatusInCache(GetPodFullName(&pod), api.PodStatus{
|
kl.setPodStatusInCache(GetPodFullName(&pod), api.PodStatus{
|
||||||
Phase: api.PodFailed,
|
Phase: api.PodFailed,
|
||||||
|
@ -18,6 +18,7 @@ package kubelet
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
@ -95,6 +96,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
|
|||||||
kubelet.sourcesReady = func() bool { return true }
|
kubelet.sourcesReady = func() bool { return true }
|
||||||
kubelet.masterServiceNamespace = api.NamespaceDefault
|
kubelet.masterServiceNamespace = api.NamespaceDefault
|
||||||
kubelet.serviceLister = testServiceLister{}
|
kubelet.serviceLister = testServiceLister{}
|
||||||
|
kubelet.nodeLister = testNodeLister{}
|
||||||
kubelet.readiness = newReadinessStates()
|
kubelet.readiness = newReadinessStates()
|
||||||
kubelet.recorder = fakeRecorder
|
kubelet.recorder = fakeRecorder
|
||||||
kubelet.podStatuses = map[string]api.PodStatus{}
|
kubelet.podStatuses = map[string]api.PodStatus{}
|
||||||
@ -1812,6 +1814,20 @@ func (ls testServiceLister) List() (api.ServiceList, error) {
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type testNodeLister struct {
|
||||||
|
nodes []api.Node
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ls testNodeLister) GetNodeInfo(id string) (*api.Node, error) {
|
||||||
|
return nil, errors.New("not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ls testNodeLister) List() (api.NodeList, error) {
|
||||||
|
return api.NodeList{
|
||||||
|
Items: ls.nodes,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func TestMakeEnvironmentVariables(t *testing.T) {
|
func TestMakeEnvironmentVariables(t *testing.T) {
|
||||||
services := []api.Service{
|
services := []api.Service{
|
||||||
{
|
{
|
||||||
@ -2792,7 +2808,7 @@ func TestGetHostPortConflicts(t *testing.T) {
|
|||||||
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 83}}}}}},
|
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 83}}}}}},
|
||||||
}
|
}
|
||||||
// Pods should not cause any conflict.
|
// Pods should not cause any conflict.
|
||||||
conflicts := getHostPortConflicts(pods)
|
_, conflicts := checkHostPortConflicts(pods)
|
||||||
if len(conflicts) != 0 {
|
if len(conflicts) != 0 {
|
||||||
t.Errorf("expected no conflicts, Got %#v", conflicts)
|
t.Errorf("expected no conflicts, Got %#v", conflicts)
|
||||||
}
|
}
|
||||||
@ -2802,7 +2818,7 @@ func TestGetHostPortConflicts(t *testing.T) {
|
|||||||
Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}},
|
Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}},
|
||||||
}
|
}
|
||||||
pods = append(pods, expected)
|
pods = append(pods, expected)
|
||||||
if actual := getHostPortConflicts(pods); !reflect.DeepEqual(actual, []api.Pod{expected}) {
|
if _, actual := checkHostPortConflicts(pods); !reflect.DeepEqual(actual, []api.Pod{expected}) {
|
||||||
t.Errorf("expected %#v, Got %#v", expected, actual)
|
t.Errorf("expected %#v, Got %#v", expected, actual)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -2838,7 +2854,7 @@ func TestHandlePortConflicts(t *testing.T) {
|
|||||||
// The newer pod should be rejected.
|
// The newer pod should be rejected.
|
||||||
conflictedPodName := GetPodFullName(&pods[0])
|
conflictedPodName := GetPodFullName(&pods[0])
|
||||||
|
|
||||||
kl.handleNotfittingPods(pods)
|
kl.handleNotFittingPods(pods)
|
||||||
if len(kl.podStatuses) != 1 {
|
if len(kl.podStatuses) != 1 {
|
||||||
t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses)
|
t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses)
|
||||||
}
|
}
|
||||||
@ -2862,6 +2878,59 @@ func TestHandlePortConflicts(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Tests that we handle not matching labels selector correctly by setting the failed status in status map.
|
||||||
|
func TestHandleNodeSelector(t *testing.T) {
|
||||||
|
testKubelet := newTestKubelet(t)
|
||||||
|
kl := testKubelet.kubelet
|
||||||
|
kl.nodeLister = testNodeLister{nodes: []api.Node{
|
||||||
|
{ObjectMeta: api.ObjectMeta{Name: "testnode", Labels: map[string]string{"key": "B"}}},
|
||||||
|
}}
|
||||||
|
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||||
|
pods := []api.Pod{
|
||||||
|
{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
UID: "123456789",
|
||||||
|
Name: "podA",
|
||||||
|
Namespace: "foo",
|
||||||
|
},
|
||||||
|
Spec: api.PodSpec{NodeSelector: map[string]string{"key": "A"}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
UID: "987654321",
|
||||||
|
Name: "podB",
|
||||||
|
Namespace: "foo",
|
||||||
|
},
|
||||||
|
Spec: api.PodSpec{NodeSelector: map[string]string{"key": "B"}},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
// The first pod should be rejected.
|
||||||
|
notfittingPodName := GetPodFullName(&pods[0])
|
||||||
|
|
||||||
|
kl.handleNotFittingPods(pods)
|
||||||
|
if len(kl.podStatuses) != 1 {
|
||||||
|
t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses)
|
||||||
|
}
|
||||||
|
// Check pod status stored in the status map.
|
||||||
|
status, ok := kl.podStatuses[notfittingPodName]
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("status of pod %q is not found in the status map.", notfittingPodName)
|
||||||
|
}
|
||||||
|
if status.Phase != api.PodFailed {
|
||||||
|
t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if we can retrieve the pod status from GetPodStatus().
|
||||||
|
kl.pods = pods
|
||||||
|
status, err := kl.GetPodStatus(notfittingPodName, "")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to retrieve pod status for pod %q: #v.", notfittingPodName, err)
|
||||||
|
}
|
||||||
|
if status.Phase != api.PodFailed {
|
||||||
|
t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Tests that we handle exceeded resources correctly by setting the failed status in status map.
|
// Tests that we handle exceeded resources correctly by setting the failed status in status map.
|
||||||
func TestHandleMemExceeded(t *testing.T) {
|
func TestHandleMemExceeded(t *testing.T) {
|
||||||
testKubelet := newTestKubelet(t)
|
testKubelet := newTestKubelet(t)
|
||||||
@ -2897,7 +2966,7 @@ func TestHandleMemExceeded(t *testing.T) {
|
|||||||
// The newer pod should be rejected.
|
// The newer pod should be rejected.
|
||||||
notfittingPodName := GetPodFullName(&pods[0])
|
notfittingPodName := GetPodFullName(&pods[0])
|
||||||
|
|
||||||
kl.handleNotfittingPods(pods)
|
kl.handleNotFittingPods(pods)
|
||||||
if len(kl.podStatuses) != 1 {
|
if len(kl.podStatuses) != 1 {
|
||||||
t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses)
|
t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses)
|
||||||
}
|
}
|
||||||
@ -2931,7 +3000,7 @@ func TestPurgingObsoleteStatusMapEntries(t *testing.T) {
|
|||||||
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}},
|
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}},
|
||||||
}
|
}
|
||||||
// Run once to populate the status map.
|
// Run once to populate the status map.
|
||||||
kl.handleNotfittingPods(pods)
|
kl.handleNotFittingPods(pods)
|
||||||
if len(kl.podStatuses) != 1 {
|
if len(kl.podStatuses) != 1 {
|
||||||
t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses)
|
t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses)
|
||||||
}
|
}
|
||||||
|
@ -55,7 +55,7 @@ func (kl *Kubelet) runOnce(pods []api.Pod, retryDelay time.Duration) (results []
|
|||||||
if kl.dockerPuller == nil {
|
if kl.dockerPuller == nil {
|
||||||
kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient, kl.pullQPS, kl.pullBurst)
|
kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient, kl.pullQPS, kl.pullBurst)
|
||||||
}
|
}
|
||||||
kl.handleNotfittingPods(pods)
|
kl.handleNotFittingPods(pods)
|
||||||
|
|
||||||
ch := make(chan RunPodResult)
|
ch := make(chan RunPodResult)
|
||||||
for i := range pods {
|
for i := range pods {
|
||||||
|
@ -78,6 +78,7 @@ func TestRunOnce(t *testing.T) {
|
|||||||
recorder: &record.FakeRecorder{},
|
recorder: &record.FakeRecorder{},
|
||||||
cadvisor: cadvisor,
|
cadvisor: cadvisor,
|
||||||
podStatuses: make(map[string]api.PodStatus),
|
podStatuses: make(map[string]api.PodStatus),
|
||||||
|
nodeLister: testNodeLister{},
|
||||||
}
|
}
|
||||||
|
|
||||||
kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
|
kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
|
||||||
|
@ -24,6 +24,7 @@ import (
|
|||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||||
)
|
)
|
||||||
|
|
||||||
// RESTStorageToNodes will take a RESTStorage object and return a client interface
|
// RESTStorageToNodes will take a RESTStorage object and return a client interface
|
||||||
@ -92,3 +93,8 @@ func (n *nodeAdaptor) Delete(name string) error {
|
|||||||
func (n *nodeAdaptor) Update(minion *api.Node) (*api.Node, error) {
|
func (n *nodeAdaptor) Update(minion *api.Node) (*api.Node, error) {
|
||||||
return nil, errors.New("direct update not implemented")
|
return nil, errors.New("direct update not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Watch watches for nodes.
|
||||||
|
func (n *nodeAdaptor) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) {
|
||||||
|
return nil, errors.New("direct watch not implemented")
|
||||||
|
}
|
||||||
|
@ -101,8 +101,7 @@ func getResourceRequest(pod *api.Pod) resourceRequest {
|
|||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetPodsExceedingCapacity(pods []api.Pod, capacity api.ResourceList) []api.Pod {
|
func CheckPodsExceedingCapacity(pods []api.Pod, capacity api.ResourceList) (fitting []api.Pod, notFitting []api.Pod) {
|
||||||
exceedingPods := []api.Pod{}
|
|
||||||
totalMilliCPU := capacity.Cpu().MilliValue()
|
totalMilliCPU := capacity.Cpu().MilliValue()
|
||||||
totalMemory := capacity.Memory().Value()
|
totalMemory := capacity.Memory().Value()
|
||||||
milliCPURequested := int64(0)
|
milliCPURequested := int64(0)
|
||||||
@ -113,14 +112,15 @@ func GetPodsExceedingCapacity(pods []api.Pod, capacity api.ResourceList) []api.P
|
|||||||
fitsMemory := totalMemory == 0 || (totalMemory-memoryRequested) >= podRequest.memory
|
fitsMemory := totalMemory == 0 || (totalMemory-memoryRequested) >= podRequest.memory
|
||||||
if !fitsCPU || !fitsMemory {
|
if !fitsCPU || !fitsMemory {
|
||||||
// the pod doesn't fit
|
// the pod doesn't fit
|
||||||
exceedingPods = append(exceedingPods, pods[ix])
|
notFitting = append(notFitting, pods[ix])
|
||||||
} else {
|
continue
|
||||||
|
}
|
||||||
// the pod fits
|
// the pod fits
|
||||||
milliCPURequested += podRequest.milliCPU
|
milliCPURequested += podRequest.milliCPU
|
||||||
memoryRequested += podRequest.memory
|
memoryRequested += podRequest.memory
|
||||||
|
fitting = append(fitting, pods[ix])
|
||||||
}
|
}
|
||||||
}
|
return
|
||||||
return exceedingPods
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// PodFitsResources calculates fit based on requested, rather than used resources
|
// PodFitsResources calculates fit based on requested, rather than used resources
|
||||||
@ -137,7 +137,8 @@ func (r *ResourceFit) PodFitsResources(pod api.Pod, existingPods []api.Pod, node
|
|||||||
pods := []api.Pod{}
|
pods := []api.Pod{}
|
||||||
copy(pods, existingPods)
|
copy(pods, existingPods)
|
||||||
pods = append(existingPods, pod)
|
pods = append(existingPods, pod)
|
||||||
if len(GetPodsExceedingCapacity(pods, info.Spec.Capacity)) > 0 {
|
_, exceeding := CheckPodsExceedingCapacity(pods, info.Spec.Capacity)
|
||||||
|
if len(exceeding) > 0 {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
return true, nil
|
return true, nil
|
||||||
@ -157,20 +158,24 @@ func NewSelectorMatchPredicate(info NodeInfo) FitPredicate {
|
|||||||
return selector.PodSelectorMatches
|
return selector.PodSelectorMatches
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func PodMatchesNodeLabels(pod *api.Pod, node *api.Node) bool {
|
||||||
|
if len(pod.Spec.NodeSelector) == 0 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
selector := labels.SelectorFromSet(pod.Spec.NodeSelector)
|
||||||
|
return selector.Matches(labels.Set(node.Labels))
|
||||||
|
}
|
||||||
|
|
||||||
type NodeSelector struct {
|
type NodeSelector struct {
|
||||||
info NodeInfo
|
info NodeInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *NodeSelector) PodSelectorMatches(pod api.Pod, existingPods []api.Pod, node string) (bool, error) {
|
func (n *NodeSelector) PodSelectorMatches(pod api.Pod, existingPods []api.Pod, node string) (bool, error) {
|
||||||
if len(pod.Spec.NodeSelector) == 0 {
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
selector := labels.SelectorFromSet(pod.Spec.NodeSelector)
|
|
||||||
minion, err := n.info.GetNodeInfo(node)
|
minion, err := n.info.GetNodeInfo(node)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
return selector.Matches(labels.Set(minion.Labels)), nil
|
return PodMatchesNodeLabels(&pod, minion), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func PodFitsHost(pod api.Pod, existingPods []api.Pod, node string) (bool, error) {
|
func PodFitsHost(pod api.Pod, existingPods []api.Pod, node string) (bool, error) {
|
||||||
|
Loading…
Reference in New Issue
Block a user