mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-09 05:01:46 +00:00
Running node selector predicate on kubelet.
Added checking on kubelet if scheduled pods have matching node selector. This is the last step to fix #5207.
This commit is contained in:
@@ -170,6 +170,27 @@ func NewMainKubelet(
|
||||
}
|
||||
serviceLister := &cache.StoreToServiceLister{serviceStore}
|
||||
|
||||
serviceStore = cache.NewStore(cache.MetaNamespaceKeyFunc)
|
||||
if kubeClient != nil {
|
||||
// TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather
|
||||
// than an interface. There is no way to construct a list+watcher using resource name.
|
||||
listWatch := &cache.ListWatch{
|
||||
// TODO: currently, we are watching all nodes. To make it more efficient,
|
||||
// we should be watching only a node with Name equal to kubelet's Hostname.
|
||||
// To make it possible, we need to add field selector to ListFunc and WatchFunc,
|
||||
// and selection by field needs to be implemented in WatchMinions function in pkg/registry/etcd.
|
||||
ListFunc: func() (runtime.Object, error) {
|
||||
return kubeClient.Nodes().List()
|
||||
},
|
||||
WatchFunc: func(resourceVersion string) (watch.Interface, error) {
|
||||
return kubeClient.Nodes().Watch(
|
||||
labels.Everything(), fields.Everything(), resourceVersion)
|
||||
},
|
||||
}
|
||||
cache.NewReflector(listWatch, &api.Service{}, serviceStore, 0).Run()
|
||||
}
|
||||
nodeLister := &cache.StoreToNodeLister{serviceStore}
|
||||
|
||||
containerGC, err := newContainerGC(dockerClient, containerGCPolicy)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -197,6 +218,7 @@ func NewMainKubelet(
|
||||
clusterDomain: clusterDomain,
|
||||
clusterDNS: clusterDNS,
|
||||
serviceLister: serviceLister,
|
||||
nodeLister: nodeLister,
|
||||
masterServiceNamespace: masterServiceNamespace,
|
||||
prober: newProbeHolder(),
|
||||
readiness: newReadinessStates(),
|
||||
@@ -244,6 +266,11 @@ type serviceLister interface {
|
||||
List() (api.ServiceList, error)
|
||||
}
|
||||
|
||||
type nodeLister interface {
|
||||
List() (machines api.NodeList, err error)
|
||||
GetNodeInfo(id string) (*api.Node, error)
|
||||
}
|
||||
|
||||
// Kubelet is the main kubelet implementation.
|
||||
type Kubelet struct {
|
||||
hostname string
|
||||
@@ -306,6 +333,7 @@ type Kubelet struct {
|
||||
|
||||
masterServiceNamespace string
|
||||
serviceLister serviceLister
|
||||
nodeLister nodeLister
|
||||
|
||||
// Volume plugins.
|
||||
volumePluginMgr volume.VolumePluginMgr
|
||||
@@ -477,6 +505,20 @@ func (kl *Kubelet) listPodsFromDisk() ([]types.UID, error) {
|
||||
return pods, nil
|
||||
}
|
||||
|
||||
func (kl *Kubelet) GetNode() (*api.Node, error) {
|
||||
l, err := kl.nodeLister.List()
|
||||
if err != nil {
|
||||
return nil, errors.New("cannot list nodes")
|
||||
}
|
||||
host := kl.GetHostname()
|
||||
for _, n := range l.Items {
|
||||
if n.Name == host {
|
||||
return &n, nil
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("node %v not found", host)
|
||||
}
|
||||
|
||||
// Starts garbage collection theads.
|
||||
func (kl *Kubelet) StartGarbageCollection() {
|
||||
go util.Forever(func() {
|
||||
@@ -1501,7 +1543,7 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric
|
||||
kl.removeOrphanedStatuses(podFullNames)
|
||||
|
||||
// Filter out the rejected pod. They don't have running containers.
|
||||
kl.handleNotfittingPods(allPods)
|
||||
kl.handleNotFittingPods(allPods)
|
||||
var pods []api.Pod
|
||||
for _, pod := range allPods {
|
||||
status, ok := kl.getPodStatusFromCache(GetPodFullName(&pod))
|
||||
@@ -1647,9 +1689,8 @@ func (s podsByCreationTime) Less(i, j int) bool {
|
||||
return s[i].CreationTimestamp.Before(s[j].CreationTimestamp)
|
||||
}
|
||||
|
||||
// getHostPortConflicts detects pods with conflicted host ports and return them.
|
||||
func getHostPortConflicts(pods []api.Pod) []api.Pod {
|
||||
conflicts := []api.Pod{}
|
||||
// checkHostPortConflicts detects pods with conflicted host ports.
|
||||
func checkHostPortConflicts(pods []api.Pod) (fitting []api.Pod, notFitting []api.Pod) {
|
||||
ports := map[int]bool{}
|
||||
extract := func(p *api.ContainerPort) int { return p.HostPort }
|
||||
|
||||
@@ -1660,48 +1701,65 @@ func getHostPortConflicts(pods []api.Pod) []api.Pod {
|
||||
pod := &pods[i]
|
||||
if errs := validation.AccumulateUniquePorts(pod.Spec.Containers, ports, extract); len(errs) != 0 {
|
||||
glog.Errorf("Pod %q: HostPort is already allocated, ignoring: %v", GetPodFullName(pod), errs)
|
||||
conflicts = append(conflicts, *pod)
|
||||
notFitting = append(notFitting, *pod)
|
||||
continue
|
||||
}
|
||||
fitting = append(fitting, *pod)
|
||||
}
|
||||
|
||||
return conflicts
|
||||
return
|
||||
}
|
||||
|
||||
func (kl *Kubelet) getPodsExceedingCapacity(pods []api.Pod) []api.Pod {
|
||||
// checkCapacityExceeded detects pods that exceeds node's resources.
|
||||
func (kl *Kubelet) checkCapacityExceeded(pods []api.Pod) (fitting []api.Pod, notFitting []api.Pod) {
|
||||
info, err := kl.GetCachedMachineInfo()
|
||||
if err != nil {
|
||||
glog.Error("error getting machine info: %v", err)
|
||||
return []api.Pod{}
|
||||
return pods, []api.Pod{}
|
||||
}
|
||||
|
||||
// Respect the pod creation order when resolving conflicts.
|
||||
sort.Sort(podsByCreationTime(pods))
|
||||
|
||||
capacity := CapacityFromMachineInfo(info)
|
||||
return scheduler.GetPodsExceedingCapacity(pods, capacity)
|
||||
return scheduler.CheckPodsExceedingCapacity(pods, capacity)
|
||||
}
|
||||
|
||||
// checkNodeSelectorMatching detects pods that do not match node's labels.
|
||||
func (kl *Kubelet) checkNodeSelectorMatching(pods []api.Pod) (fitting []api.Pod, notFitting []api.Pod) {
|
||||
node, err := kl.GetNode()
|
||||
if err != nil {
|
||||
glog.Errorf("error getting node: %v", err)
|
||||
return pods, []api.Pod{}
|
||||
}
|
||||
for _, pod := range pods {
|
||||
if !scheduler.PodMatchesNodeLabels(&pod, node) {
|
||||
notFitting = append(notFitting, pod)
|
||||
continue
|
||||
}
|
||||
fitting = append(fitting, pod)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// handleNotfittingPods handles pods that do not fit on the node.
|
||||
// Currently conflicts on Port.HostPort values and exceeding node capacity are handled.
|
||||
func (kl *Kubelet) handleNotfittingPods(pods []api.Pod) {
|
||||
conflicts := getHostPortConflicts(pods)
|
||||
conflictsMap := map[types.UID]bool{}
|
||||
for _, pod := range conflicts {
|
||||
// Currently conflicts on Port.HostPort values, matching node's labels and exceeding node's capacity are handled.
|
||||
func (kl *Kubelet) handleNotFittingPods(pods []api.Pod) {
|
||||
fitting, notFitting := checkHostPortConflicts(pods)
|
||||
for _, pod := range notFitting {
|
||||
kl.recorder.Eventf(&pod, "hostPortConflict", "Cannot start the pod due to host port conflict.")
|
||||
kl.setPodStatusInCache(GetPodFullName(&pod), api.PodStatus{
|
||||
Phase: api.PodFailed,
|
||||
Message: "Pod cannot be started due to host port conflict"})
|
||||
conflictsMap[pod.UID] = true
|
||||
}
|
||||
remainingPods := []api.Pod{}
|
||||
for _, pod := range pods {
|
||||
if !conflictsMap[pod.UID] {
|
||||
remainingPods = append(remainingPods, pod)
|
||||
}
|
||||
fitting, notFitting = kl.checkNodeSelectorMatching(fitting)
|
||||
for _, pod := range notFitting {
|
||||
kl.recorder.Eventf(&pod, "nodeSelectorMismatching", "Cannot start the pod due to node selector mismatch.")
|
||||
kl.setPodStatusInCache(GetPodFullName(&pod), api.PodStatus{
|
||||
Phase: api.PodFailed,
|
||||
Message: "Pod cannot be started due to node selector mismatch"})
|
||||
}
|
||||
conflicts = kl.getPodsExceedingCapacity(remainingPods)
|
||||
for _, pod := range conflicts {
|
||||
fitting, notFitting = kl.checkCapacityExceeded(fitting)
|
||||
for _, pod := range notFitting {
|
||||
kl.recorder.Eventf(&pod, "capacityExceeded", "Cannot start the pod due to exceeded capacity.")
|
||||
kl.setPodStatusInCache(GetPodFullName(&pod), api.PodStatus{
|
||||
Phase: api.PodFailed,
|
||||
|
Reference in New Issue
Block a user