Merge pull request #89974 from ahg-g/ahg-info

scheduler's NodeInfo tracks PodInfos instead of Pods
This commit is contained in:
Kubernetes Prow Robot 2020-04-09 09:19:58 -07:00 committed by GitHub
commit 0c9245a29f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 358 additions and 381 deletions

View File

@ -287,9 +287,9 @@ func (h *HTTPExtender) convertToNodeToVictims(
func (h *HTTPExtender) convertPodUIDToPod(
metaPod *extenderv1.MetaPod,
nodeInfo *framework.NodeInfo) (*v1.Pod, error) {
for _, pod := range nodeInfo.Pods() {
if string(pod.UID) == metaPod.UID {
return pod, nil
for _, p := range nodeInfo.Pods() {
if string(p.Pod.UID) == metaPod.UID {
return p.Pod, nil
}
}
return nil, fmt.Errorf("extender: %v claims to preempt pod (UID: %v) on node: %v, but the pod is not found on that node",

View File

@ -226,9 +226,9 @@ func (f *FakeExtender) selectVictimsOnNodeByExtender(pod *v1.Pod, node *v1.Node)
// check if the given pod can be scheduled.
podPriority := podutil.GetPodPriority(pod)
for _, p := range nodeInfoCopy.Pods() {
if podutil.GetPodPriority(p) < podPriority {
potentialVictims = append(potentialVictims, p)
removePod(p)
if podutil.GetPodPriority(p.Pod) < podPriority {
potentialVictims = append(potentialVictims, p.Pod)
removePod(p.Pod)
}
}
sort.Slice(potentialVictims, func(i, j int) bool { return util.MoreImportantPod(potentialVictims[i], potentialVictims[j]) })

View File

@ -971,9 +971,9 @@ func (g *genericScheduler) selectVictimsOnNode(
// check if the given pod can be scheduled.
podPriority := podutil.GetPodPriority(pod)
for _, p := range nodeInfo.Pods() {
if podutil.GetPodPriority(p) < podPriority {
potentialVictims = append(potentialVictims, p)
if err := removePod(p); err != nil {
if podutil.GetPodPriority(p.Pod) < podPriority {
potentialVictims = append(potentialVictims, p.Pod)
if err := removePod(p.Pod); err != nil {
return nil, 0, false
}
}
@ -1063,7 +1063,7 @@ func podEligibleToPreemptOthers(pod *v1.Pod, nodeInfos framework.NodeInfoLister,
if nodeInfo, _ := nodeInfos.Get(nomNodeName); nodeInfo != nil {
podPriority := podutil.GetPodPriority(pod)
for _, p := range nodeInfo.Pods() {
if p.DeletionTimestamp != nil && podutil.GetPodPriority(p) < podPriority {
if p.Pod.DeletionTimestamp != nil && podutil.GetPodPriority(p.Pod) < podPriority {
// There is a terminating pod on the nominated node.
return false
}

View File

@ -200,11 +200,11 @@ func countMatchingPods(namespace string, selector labels.Selector, nodeInfo *fra
return 0
}
count := 0
for _, pod := range nodeInfo.Pods() {
for _, p := range nodeInfo.Pods() {
// Ignore pods being deleted for spreading purposes
// Similar to how it is done for SelectorSpreadPriority
if namespace == pod.Namespace && pod.DeletionTimestamp == nil {
if selector.Matches(labels.Set(pod.Labels)) {
if namespace == p.Pod.Namespace && p.Pod.DeletionTimestamp == nil {
if selector.Matches(labels.Set(p.Pod.Labels)) {
count++
}
}

View File

@ -229,7 +229,7 @@ func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*framework.Nod
return
}
for _, existingPod := range nodeInfo.PodsWithAffinity() {
existingPodTopologyMaps, err := getMatchingAntiAffinityTopologyPairsOfPod(pod, existingPod, node)
existingPodTopologyMaps, err := getMatchingAntiAffinityTopologyPairsOfPod(pod, existingPod.Pod, node)
if err != nil {
errCh.SendErrorWithCancel(err, cancel)
return
@ -293,10 +293,10 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, allNodes []*frame
nodeTopologyPairsAntiAffinityPodsMap := make(topologyToMatchedTermCount)
for _, existingPod := range nodeInfo.Pods() {
// Check affinity terms.
nodeTopologyPairsAffinityPodsMap.updateWithAffinityTerms(existingPod, node, affinityTerms, 1)
nodeTopologyPairsAffinityPodsMap.updateWithAffinityTerms(existingPod.Pod, node, affinityTerms, 1)
// Check anti-affinity terms.
nodeTopologyPairsAntiAffinityPodsMap.updateWithAntiAffinityTerms(existingPod, node, antiAffinityTerms, 1)
nodeTopologyPairsAntiAffinityPodsMap.updateWithAntiAffinityTerms(existingPod.Pod, node, antiAffinityTerms, 1)
}
if len(nodeTopologyPairsAffinityPodsMap) > 0 || len(nodeTopologyPairsAntiAffinityPodsMap) > 0 {

View File

@ -247,7 +247,7 @@ func (pl *InterPodAffinity) PreScore(
topoScore := make(scoreMap)
for _, existingPod := range podsToProcess {
if err := pl.processExistingPod(state, existingPod, nodeInfo, pod, topoScore); err != nil {
if err := pl.processExistingPod(state, existingPod.Pod, nodeInfo, pod, topoScore); err != nil {
errCh.SendErrorWithCancel(err, cancel)
return
}

View File

@ -32,10 +32,10 @@ import (
)
// getExistingVolumeCountForNode gets the current number of volumes on node.
func getExistingVolumeCountForNode(pods []*v1.Pod, maxVolumes int) int {
func getExistingVolumeCountForNode(podInfos []*framework.PodInfo, maxVolumes int) int {
volumeCount := 0
for _, pod := range pods {
volumeCount += len(pod.Spec.Volumes)
for _, p := range podInfos {
volumeCount += len(p.Pod.Spec.Volumes)
}
if maxVolumes-volumeCount > 0 {
return maxVolumes - volumeCount

View File

@ -103,7 +103,7 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v
attachedVolumes := make(map[string]string)
for _, existingPod := range nodeInfo.Pods() {
if err := pl.filterAttachableVolumes(csiNode, existingPod.Spec.Volumes, existingPod.Namespace, attachedVolumes); err != nil {
if err := pl.filterAttachableVolumes(csiNode, existingPod.Pod.Spec.Volumes, existingPod.Pod.Namespace, attachedVolumes); err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
}

View File

@ -236,7 +236,7 @@ func (pl *nonCSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod
// count unique volumes
existingVolumes := make(map[string]bool)
for _, existingPod := range nodeInfo.Pods() {
if err := pl.filterVolumes(existingPod.Spec.Volumes, existingPod.Namespace, existingVolumes); err != nil {
if err := pl.filterVolumes(existingPod.Pod.Spec.Volumes, existingPod.Pod.Namespace, existingVolumes); err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
}

View File

@ -21,6 +21,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
)
type topologyPair struct {
@ -83,14 +84,14 @@ func filterTopologySpreadConstraints(constraints []v1.TopologySpreadConstraint,
return result, nil
}
func countPodsMatchSelector(pods []*v1.Pod, selector labels.Selector, ns string) int {
func countPodsMatchSelector(podInfos []*framework.PodInfo, selector labels.Selector, ns string) int {
count := 0
for _, p := range pods {
for _, p := range podInfos {
// Bypass terminating Pod (see #87621).
if p.DeletionTimestamp != nil || p.Namespace != ns {
if p.Pod.DeletionTimestamp != nil || p.Pod.Namespace != ns {
continue
}
if selector.Matches(labels.Set(p.Labels)) {
if selector.Matches(labels.Set(p.Pod.Labels)) {
count++
}
}

View File

@ -285,8 +285,8 @@ func (pl *ServiceAffinity) Score(ctx context.Context, state *framework.CycleStat
for _, existingPod := range nodeInfo.Pods() {
// Ignore pods being deleted for spreading purposes
// Similar to how it is done for SelectorSpreadPriority
if pod.Namespace == existingPod.Namespace && existingPod.DeletionTimestamp == nil {
if selector.Matches(labels.Set(existingPod.Labels)) {
if pod.Namespace == existingPod.Pod.Namespace && existingPod.Pod.DeletionTimestamp == nil {
if selector.Matches(labels.Set(existingPod.Pod.Labels)) {
score++
}
}

View File

@ -120,7 +120,7 @@ func haveOverlap(a1, a2 []string) bool {
func (pl *VolumeRestrictions) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
for _, v := range pod.Spec.Volumes {
for _, ev := range nodeInfo.Pods() {
if isVolumeConflict(v, ev) {
if isVolumeConflict(v, ev.Pod) {
return framework.NewStatus(framework.Unschedulable, ErrReasonDiskConflict)
}
}

View File

@ -210,32 +210,6 @@ type Plugin interface {
Name() string
}
// PodInfo is a wrapper to a Pod with additional information for purposes such as tracking
// the timestamp when it's added to the queue or recording per-pod metrics.
type PodInfo struct {
Pod *v1.Pod
// The time pod added to the scheduling queue.
Timestamp time.Time
// Number of schedule attempts before successfully scheduled.
// It's used to record the # attempts metric.
Attempts int
// The time when the pod is added to the queue for the first time. The pod may be added
// back to the queue multiple times before it's successfully scheduled.
// It shouldn't be updated once initialized. It's used to record the e2e scheduling
// latency for a pod.
InitialAttemptTimestamp time.Time
}
// DeepCopy returns a deep copy of the PodInfo object.
func (podInfo *PodInfo) DeepCopy() *PodInfo {
return &PodInfo{
Pod: podInfo.Pod.DeepCopy(),
Timestamp: podInfo.Timestamp,
Attempts: podInfo.Attempts,
InitialAttemptTimestamp: podInfo.InitialAttemptTimestamp,
}
}
// LessFunc is the function to sort pod info
type LessFunc func(podInfo1, podInfo2 *PodInfo) bool

View File

@ -21,8 +21,9 @@ import (
"fmt"
"sync"
"sync/atomic"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog"
@ -36,6 +37,39 @@ var (
generation int64
)
// PodInfo is a wrapper to a Pod with additional information for purposes such as tracking
// the timestamp when it's added to the queue or recording per-pod metrics.
type PodInfo struct {
Pod *v1.Pod
// The time pod added to the scheduling queue.
Timestamp time.Time
// Number of schedule attempts before successfully scheduled.
// It's used to record the # attempts metric.
Attempts int
// The time when the pod is added to the queue for the first time. The pod may be added
// back to the queue multiple times before it's successfully scheduled.
// It shouldn't be updated once initialized. It's used to record the e2e scheduling
// latency for a pod.
InitialAttemptTimestamp time.Time
}
// DeepCopy returns a deep copy of the PodInfo object.
func (podInfo *PodInfo) DeepCopy() *PodInfo {
return &PodInfo{
Pod: podInfo.Pod.DeepCopy(),
Timestamp: podInfo.Timestamp,
Attempts: podInfo.Attempts,
InitialAttemptTimestamp: podInfo.InitialAttemptTimestamp,
}
}
// NewPodInfo return a new PodInfo
func NewPodInfo(pod *v1.Pod) *PodInfo {
return &PodInfo{
Pod: pod,
}
}
// ImageStateSummary provides summarized information about the state of an image.
type ImageStateSummary struct {
// Size of the image
@ -49,8 +83,8 @@ type NodeInfo struct {
// Overall node information.
node *v1.Node
pods []*v1.Pod
podsWithAffinity []*v1.Pod
pods []*PodInfo
podsWithAffinity []*PodInfo
usedPorts HostPortInfo
// Total requested resources of all pods on this node. This includes assumed
@ -290,18 +324,13 @@ func (n *NodeInfo) Node() *v1.Node {
}
// Pods return all pods scheduled (including assumed to be) on this node.
func (n *NodeInfo) Pods() []*v1.Pod {
func (n *NodeInfo) Pods() []*PodInfo {
if n == nil {
return nil
}
return n.pods
}
// SetPods sets all pods scheduled (including assumed to be) on this node.
func (n *NodeInfo) SetPods(pods []*v1.Pod) {
n.pods = pods
}
// UsedPorts returns used ports on this node.
func (n *NodeInfo) UsedPorts() HostPortInfo {
if n == nil {
@ -329,7 +358,7 @@ func (n *NodeInfo) SetImageStates(newImageStates map[string]*ImageStateSummary)
}
// PodsWithAffinity return all pods with (anti)affinity constraints on this node.
func (n *NodeInfo) PodsWithAffinity() []*v1.Pod {
func (n *NodeInfo) PodsWithAffinity() []*PodInfo {
if n == nil {
return nil
}
@ -427,7 +456,7 @@ func (n *NodeInfo) Clone() *NodeInfo {
generation: n.generation,
}
if len(n.pods) > 0 {
clone.pods = append([]*v1.Pod(nil), n.pods...)
clone.pods = append([]*PodInfo(nil), n.pods...)
}
if len(n.usedPorts) > 0 {
// HostPortInfo is a map-in-map struct
@ -440,7 +469,7 @@ func (n *NodeInfo) Clone() *NodeInfo {
}
}
if len(n.podsWithAffinity) > 0 {
clone.podsWithAffinity = append([]*v1.Pod(nil), n.podsWithAffinity...)
clone.podsWithAffinity = append([]*PodInfo(nil), n.podsWithAffinity...)
}
if len(n.taints) > 0 {
clone.taints = append([]v1.Taint(nil), n.taints...)
@ -462,8 +491,8 @@ func (n *NodeInfo) VolumeLimits() map[v1.ResourceName]int64 {
// String returns representation of human readable format of this NodeInfo.
func (n *NodeInfo) String() string {
podKeys := make([]string, len(n.pods))
for i, pod := range n.pods {
podKeys[i] = pod.Name
for i, p := range n.pods {
podKeys[i] = p.Pod.Name
}
return fmt.Sprintf("&NodeInfo{Pods:%v, RequestedResource:%#v, NonZeroRequest: %#v, UsedPort: %#v, AllocatableResource:%#v}",
podKeys, n.requestedResource, n.nonzeroRequest, n.usedPorts, n.allocatableResource)
@ -476,7 +505,9 @@ func hasPodAffinityConstraints(pod *v1.Pod) bool {
// AddPod adds pod information to this NodeInfo.
func (n *NodeInfo) AddPod(pod *v1.Pod) {
res, non0CPU, non0Mem := calculateResource(pod)
// TODO(#89528): AddPod should accept a PodInfo as an input argument.
podInfo := NewPodInfo(pod)
res, non0CPU, non0Mem := calculateResource(podInfo.Pod)
n.requestedResource.MilliCPU += res.MilliCPU
n.requestedResource.Memory += res.Memory
n.requestedResource.EphemeralStorage += res.EphemeralStorage
@ -488,13 +519,13 @@ func (n *NodeInfo) AddPod(pod *v1.Pod) {
}
n.nonzeroRequest.MilliCPU += non0CPU
n.nonzeroRequest.Memory += non0Mem
n.pods = append(n.pods, pod)
if hasPodAffinityConstraints(pod) {
n.podsWithAffinity = append(n.podsWithAffinity, pod)
n.pods = append(n.pods, podInfo)
if hasPodAffinityConstraints(podInfo.Pod) {
n.podsWithAffinity = append(n.podsWithAffinity, podInfo)
}
// Consume ports when pods added.
n.UpdateUsedPorts(pod, true)
n.UpdateUsedPorts(podInfo.Pod, true)
n.generation = nextGeneration()
}
@ -507,7 +538,7 @@ func (n *NodeInfo) RemovePod(pod *v1.Pod) error {
}
for i := range n.podsWithAffinity {
k2, err := GetPodKey(n.podsWithAffinity[i])
k2, err := GetPodKey(n.podsWithAffinity[i].Pod)
if err != nil {
klog.Errorf("Cannot get pod key, err: %v", err)
continue
@ -520,7 +551,7 @@ func (n *NodeInfo) RemovePod(pod *v1.Pod) error {
}
}
for i := range n.pods {
k2, err := GetPodKey(n.pods[i])
k2, err := GetPodKey(n.pods[i].Pod)
if err != nil {
klog.Errorf("Cannot get pod key, err: %v", err)
continue
@ -656,7 +687,7 @@ func (n *NodeInfo) FilterOutPods(pods []*v1.Pod) []*v1.Pod {
continue
}
for _, np := range n.Pods() {
npodkey, _ := GetPodKey(np)
npodkey, _ := GetPodKey(np.Pod)
if npodkey == podKey {
filtered = append(filtered, p)
break
@ -675,21 +706,6 @@ func GetPodKey(pod *v1.Pod) (string, error) {
return uid, nil
}
// Filter implements PodFilter interface. It returns false only if the pod node name
// matches NodeInfo.node and the pod is not found in the pods list. Otherwise,
// returns true.
func (n *NodeInfo) Filter(pod *v1.Pod) bool {
if pod.Spec.NodeName != n.node.Name {
return true
}
for _, p := range n.pods {
if p.Name == pod.Name && p.Namespace == pod.Namespace {
return true
}
}
return false
}
// DefaultBindAllHostIP defines the default ip address used to bind to all host.
const DefaultBindAllHostIP = "0.0.0.0"

View File

@ -309,59 +309,63 @@ func TestNewNodeInfo(t *testing.T) {
},
},
imageStates: map[string]*ImageStateSummary{},
pods: []*v1.Pod{
pods: []*PodInfo{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "node_info_cache_test",
Name: "test-1",
UID: types.UID("test-1"),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("100m"),
v1.ResourceMemory: resource.MustParse("500"),
Pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "node_info_cache_test",
Name: "test-1",
UID: types.UID("test-1"),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("100m"),
v1.ResourceMemory: resource.MustParse("500"),
},
},
},
Ports: []v1.ContainerPort{
{
HostIP: "127.0.0.1",
HostPort: 80,
Protocol: "TCP",
Ports: []v1.ContainerPort{
{
HostIP: "127.0.0.1",
HostPort: 80,
Protocol: "TCP",
},
},
},
},
NodeName: nodeName,
},
NodeName: nodeName,
},
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "node_info_cache_test",
Name: "test-2",
UID: types.UID("test-2"),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("200m"),
v1.ResourceMemory: resource.MustParse("1Ki"),
Pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "node_info_cache_test",
Name: "test-2",
UID: types.UID("test-2"),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("200m"),
v1.ResourceMemory: resource.MustParse("1Ki"),
},
},
},
Ports: []v1.ContainerPort{
{
HostIP: "127.0.0.1",
HostPort: 8080,
Protocol: "TCP",
Ports: []v1.ContainerPort{
{
HostIP: "127.0.0.1",
HostPort: 8080,
Protocol: "TCP",
},
},
},
},
NodeName: nodeName,
},
NodeName: nodeName,
},
},
},
@ -398,59 +402,63 @@ func TestNodeInfoClone(t *testing.T) {
},
},
imageStates: map[string]*ImageStateSummary{},
pods: []*v1.Pod{
pods: []*PodInfo{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "node_info_cache_test",
Name: "test-1",
UID: types.UID("test-1"),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("100m"),
v1.ResourceMemory: resource.MustParse("500"),
Pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "node_info_cache_test",
Name: "test-1",
UID: types.UID("test-1"),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("100m"),
v1.ResourceMemory: resource.MustParse("500"),
},
},
},
Ports: []v1.ContainerPort{
{
HostIP: "127.0.0.1",
HostPort: 80,
Protocol: "TCP",
Ports: []v1.ContainerPort{
{
HostIP: "127.0.0.1",
HostPort: 80,
Protocol: "TCP",
},
},
},
},
NodeName: nodeName,
},
NodeName: nodeName,
},
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "node_info_cache_test",
Name: "test-2",
UID: types.UID("test-2"),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("200m"),
v1.ResourceMemory: resource.MustParse("1Ki"),
Pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "node_info_cache_test",
Name: "test-2",
UID: types.UID("test-2"),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("200m"),
v1.ResourceMemory: resource.MustParse("1Ki"),
},
},
},
Ports: []v1.ContainerPort{
{
HostIP: "127.0.0.1",
HostPort: 8080,
Protocol: "TCP",
Ports: []v1.ContainerPort{
{
HostIP: "127.0.0.1",
HostPort: 8080,
Protocol: "TCP",
},
},
},
},
NodeName: nodeName,
},
NodeName: nodeName,
},
},
},
@ -468,59 +476,63 @@ func TestNodeInfoClone(t *testing.T) {
},
},
imageStates: map[string]*ImageStateSummary{},
pods: []*v1.Pod{
pods: []*PodInfo{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "node_info_cache_test",
Name: "test-1",
UID: types.UID("test-1"),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("100m"),
v1.ResourceMemory: resource.MustParse("500"),
Pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "node_info_cache_test",
Name: "test-1",
UID: types.UID("test-1"),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("100m"),
v1.ResourceMemory: resource.MustParse("500"),
},
},
},
Ports: []v1.ContainerPort{
{
HostIP: "127.0.0.1",
HostPort: 80,
Protocol: "TCP",
Ports: []v1.ContainerPort{
{
HostIP: "127.0.0.1",
HostPort: 80,
Protocol: "TCP",
},
},
},
},
NodeName: nodeName,
},
NodeName: nodeName,
},
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "node_info_cache_test",
Name: "test-2",
UID: types.UID("test-2"),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("200m"),
v1.ResourceMemory: resource.MustParse("1Ki"),
Pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "node_info_cache_test",
Name: "test-2",
UID: types.UID("test-2"),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("200m"),
v1.ResourceMemory: resource.MustParse("1Ki"),
},
},
},
Ports: []v1.ContainerPort{
{
HostIP: "127.0.0.1",
HostPort: 8080,
Protocol: "TCP",
Ports: []v1.ContainerPort{
{
HostIP: "127.0.0.1",
HostPort: 8080,
Protocol: "TCP",
},
},
},
},
NodeName: nodeName,
},
NodeName: nodeName,
},
},
},
@ -633,64 +645,68 @@ func TestNodeInfoAddPod(t *testing.T) {
},
},
imageStates: map[string]*ImageStateSummary{},
pods: []*v1.Pod{
pods: []*PodInfo{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "node_info_cache_test",
Name: "test-1",
UID: types.UID("test-1"),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("100m"),
v1.ResourceMemory: resource.MustParse("500"),
Pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "node_info_cache_test",
Name: "test-1",
UID: types.UID("test-1"),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("100m"),
v1.ResourceMemory: resource.MustParse("500"),
},
},
},
Ports: []v1.ContainerPort{
{
HostIP: "127.0.0.1",
HostPort: 80,
Protocol: "TCP",
Ports: []v1.ContainerPort{
{
HostIP: "127.0.0.1",
HostPort: 80,
Protocol: "TCP",
},
},
},
},
},
NodeName: nodeName,
Overhead: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("500m"),
NodeName: nodeName,
Overhead: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("500m"),
},
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "node_info_cache_test",
Name: "test-2",
UID: types.UID("test-2"),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("200m"),
Pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "node_info_cache_test",
Name: "test-2",
UID: types.UID("test-2"),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("200m"),
},
},
},
Ports: []v1.ContainerPort{
{
HostIP: "127.0.0.1",
HostPort: 8080,
Protocol: "TCP",
Ports: []v1.ContainerPort{
{
HostIP: "127.0.0.1",
HostPort: 8080,
Protocol: "TCP",
},
},
},
},
},
NodeName: nodeName,
Overhead: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("500m"),
v1.ResourceMemory: resource.MustParse("500"),
NodeName: nodeName,
Overhead: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("500m"),
v1.ResourceMemory: resource.MustParse("500"),
},
},
},
},
@ -717,6 +733,7 @@ func TestNodeInfoRemovePod(t *testing.T) {
nodeName := "test-node"
pods := []*v1.Pod{
makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}),
}
@ -766,66 +783,70 @@ func TestNodeInfoRemovePod(t *testing.T) {
},
},
imageStates: map[string]*ImageStateSummary{},
pods: []*v1.Pod{
pods: []*PodInfo{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "node_info_cache_test",
Name: "test-1",
UID: types.UID("test-1"),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("100m"),
v1.ResourceMemory: resource.MustParse("500"),
Pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "node_info_cache_test",
Name: "test-1",
UID: types.UID("test-1"),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("100m"),
v1.ResourceMemory: resource.MustParse("500"),
},
},
},
Ports: []v1.ContainerPort{
{
HostIP: "127.0.0.1",
HostPort: 80,
Protocol: "TCP",
Ports: []v1.ContainerPort{
{
HostIP: "127.0.0.1",
HostPort: 80,
Protocol: "TCP",
},
},
},
},
},
NodeName: nodeName,
Overhead: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("500m"),
v1.ResourceMemory: resource.MustParse("500"),
NodeName: nodeName,
Overhead: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("500m"),
v1.ResourceMemory: resource.MustParse("500"),
},
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "node_info_cache_test",
Name: "test-2",
UID: types.UID("test-2"),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("200m"),
v1.ResourceMemory: resource.MustParse("1Ki"),
Pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "node_info_cache_test",
Name: "test-2",
UID: types.UID("test-2"),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("200m"),
v1.ResourceMemory: resource.MustParse("1Ki"),
},
},
},
Ports: []v1.ContainerPort{
{
HostIP: "127.0.0.1",
HostPort: 8080,
Protocol: "TCP",
Ports: []v1.ContainerPort{
{
HostIP: "127.0.0.1",
HostPort: 8080,
Protocol: "TCP",
},
},
},
},
},
NodeName: nodeName,
Overhead: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("500m"),
v1.ResourceMemory: resource.MustParse("500"),
NodeName: nodeName,
Overhead: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("500m"),
v1.ResourceMemory: resource.MustParse("500"),
},
},
},
},
@ -894,35 +915,37 @@ func TestNodeInfoRemovePod(t *testing.T) {
},
},
imageStates: map[string]*ImageStateSummary{},
pods: []*v1.Pod{
pods: []*PodInfo{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "node_info_cache_test",
Name: "test-2",
UID: types.UID("test-2"),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("200m"),
v1.ResourceMemory: resource.MustParse("1Ki"),
Pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "node_info_cache_test",
Name: "test-2",
UID: types.UID("test-2"),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("200m"),
v1.ResourceMemory: resource.MustParse("1Ki"),
},
},
},
Ports: []v1.ContainerPort{
{
HostIP: "127.0.0.1",
HostPort: 8080,
Protocol: "TCP",
Ports: []v1.ContainerPort{
{
HostIP: "127.0.0.1",
HostPort: 8080,
Protocol: "TCP",
},
},
},
},
},
NodeName: nodeName,
Overhead: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("500m"),
v1.ResourceMemory: resource.MustParse("500"),
NodeName: nodeName,
Overhead: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("500m"),
v1.ResourceMemory: resource.MustParse("500"),
},
},
},
},

View File

@ -314,12 +314,7 @@ func (cache *schedulerCache) removeDeletedNodesFromSnapshot(snapshot *Snapshot)
}
}
func (cache *schedulerCache) List(selector labels.Selector) ([]*v1.Pod, error) {
alwaysTrue := func(p *v1.Pod) bool { return true }
return cache.FilteredList(alwaysTrue, selector)
}
func (cache *schedulerCache) FilteredList(podFilter framework.PodFilter, selector labels.Selector) ([]*v1.Pod, error) {
func (cache *schedulerCache) ListPods(selector labels.Selector) ([]*v1.Pod, error) {
cache.mu.RLock()
defer cache.mu.RUnlock()
// podFilter is expected to return true for most or all of the pods. We
@ -331,9 +326,9 @@ func (cache *schedulerCache) FilteredList(podFilter framework.PodFilter, selecto
}
pods := make([]*v1.Pod, 0, maxSize)
for _, n := range cache.nodes {
for _, pod := range n.info.Pods() {
if podFilter(pod) && selector.Matches(labels.Set(pod.Labels)) {
pods = append(pods, pod)
for _, p := range n.info.Pods() {
if selector.Matches(labels.Set(p.Pod.Labels)) {
pods = append(pods, p.Pod)
}
}
}

View File

@ -927,42 +927,15 @@ func TestForgetPod(t *testing.T) {
}
}
// getResourceRequest returns the resource request of all containers in Pods;
// excluding initContainers.
func getResourceRequest(pod *v1.Pod) v1.ResourceList {
result := &framework.Resource{}
for _, container := range pod.Spec.Containers {
result.Add(container.Resources.Requests)
}
return result.ResourceList()
}
// buildNodeInfo creates a NodeInfo by simulating node operations in cache.
func buildNodeInfo(node *v1.Node, pods []*v1.Pod) *framework.NodeInfo {
expected := framework.NewNodeInfo()
// Simulate SetNode.
expected.SetNode(node)
expected.SetAllocatableResource(framework.NewResource(node.Status.Allocatable))
expected.SetTaints(node.Spec.Taints)
expected.SetGeneration(expected.GetGeneration() + 1)
for _, pod := range pods {
// Simulate AddPod
pods := append(expected.Pods(), pod)
expected.SetPods(pods)
requestedResource := expected.RequestedResource()
newRequestedResource := &requestedResource
newRequestedResource.Add(getResourceRequest(pod))
expected.SetRequestedResource(newRequestedResource)
nonZeroRequest := expected.NonZeroRequest()
newNonZeroRequest := &nonZeroRequest
newNonZeroRequest.Add(getResourceRequest(pod))
expected.SetNonZeroRequest(newNonZeroRequest)
expected.UpdateUsedPorts(pod, true)
expected.SetGeneration(expected.GetGeneration() + 1)
expected.AddPod(pod)
}
return expected

View File

@ -91,8 +91,8 @@ func (c *CacheComparer) ComparePods(pods, waitingPods []*v1.Pod, nodeinfos map[s
cached := []string{}
for _, nodeinfo := range nodeinfos {
for _, pod := range nodeinfo.Pods() {
cached = append(cached, string(pod.UID))
for _, p := range nodeinfo.Pods() {
cached = append(cached, string(p.Pod.UID))
}
}
for _, pod := range waitingPods {

View File

@ -67,7 +67,7 @@ func (d *CacheDumper) printNodeInfo(n *framework.NodeInfo) string {
n.Node().Name, n.RequestedResource(), n.AllocatableResource(), len(n.Pods())))
// Dumping Pod Info
for _, p := range n.Pods() {
nodeData.WriteString(printPod(p))
nodeData.WriteString(printPod(p.Pod))
}
// Dumping nominated pods info on the node
nominatedPods := d.podQueue.NominatedPodsForNode(n.Node().Name)

View File

@ -6,7 +6,6 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/scheduler/internal/cache/fake",
visibility = ["//pkg/scheduler:__subpackages__"],
deps = [
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",

View File

@ -19,7 +19,6 @@ package fake
import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
)
@ -79,13 +78,8 @@ func (c *Cache) UpdateSnapshot(snapshot *internalcache.Snapshot) error {
return nil
}
// List is a fake method for testing.
func (c *Cache) List(s labels.Selector) ([]*v1.Pod, error) { return nil, nil }
// FilteredList is a fake method for testing.
func (c *Cache) FilteredList(filter framework.PodFilter, selector labels.Selector) ([]*v1.Pod, error) {
return nil, nil
}
// ListPods is a fake method for testing.
func (c *Cache) ListPods(s labels.Selector) ([]*v1.Pod, error) { return nil, nil }
// Dump is a fake method for testing.
func (c *Cache) Dump() *internalcache.Dump {

View File

@ -17,7 +17,8 @@ limitations under the License.
package cache
import (
v1 "k8s.io/api/core/v1"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
)
@ -56,7 +57,8 @@ import (
// - Both "Expired" and "Deleted" are valid end states. In case of some problems, e.g. network issue,
// a pod might have changed its state (e.g. added and deleted) without delivering notification to the cache.
type Cache interface {
framework.PodLister
// ListPods lists all pods in the cache.
ListPods(selector labels.Selector) ([]*v1.Pod, error)
// AssumePod assumes a pod scheduled and aggregates the pod's information into its node.
// The implementation also decides the policy to expire pod before being confirmed (receiving Add event).

View File

@ -157,9 +157,9 @@ func (p podLister) FilteredList(filter framework.PodFilter, selector labels.Sele
}
pods := make([]*v1.Pod, 0, maxSize)
for _, n := range p {
for _, pod := range n.Pods() {
if filter(pod) && selector.Matches(labels.Set(pod.Labels)) {
pods = append(pods, pod)
for _, p := range n.Pods() {
if filter(p.Pod) && selector.Matches(labels.Set(p.Pod.Labels)) {
pods = append(pods, p.Pod)
}
}
}

View File

@ -555,7 +555,7 @@ func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) {
return
default:
}
pods, err := scache.List(labels.Everything())
pods, err := scache.ListPods(labels.Everything())
if err != nil {
errChan <- fmt.Errorf("cache.List failed: %v", err)
return

View File

@ -424,7 +424,7 @@ func waitForPDBsStable(testCtx *testutils.TestContext, pdbs []*policy.PodDisrupt
// waitCachedPodsStable waits until scheduler cache has the given pods.
func waitCachedPodsStable(testCtx *testutils.TestContext, pods []*v1.Pod) error {
return wait.Poll(time.Second, 30*time.Second, func() (bool, error) {
cachedPods, err := testCtx.Scheduler.SchedulerCache.List(labels.Everything())
cachedPods, err := testCtx.Scheduler.SchedulerCache.ListPods(labels.Everything())
if err != nil {
return false, err
}