dedupe pod resource request calculation

This commit is contained in:
Todd Neal 2023-03-08 20:52:21 -06:00
parent c67953a2d0
commit 4096c9209c
18 changed files with 980 additions and 274 deletions

View File

@ -24,54 +24,127 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
) )
// PodRequestsAndLimits returns a dictionary of all defined resources summed up for all // PodResourcesOptions controls the behavior of PodRequests and PodLimits.
// containers of the pod. Pod overhead is added to the type PodResourcesOptions struct {
// total container resource requests and to the total container limits which have a // Reuse, if provided will be reused to accumulate resources and returned by the PodRequests or PodLimits
// non-zero quantity. // functions. All existing values in Reuse will be lost.
func PodRequestsAndLimits(pod *v1.Pod) (reqs, limits v1.ResourceList) { Reuse v1.ResourceList
return PodRequestsAndLimitsReuse(pod, nil, nil) // InPlacePodVerticalScalingEnabled indicates that the in-place pod vertical scaling feature gate is enabled.
InPlacePodVerticalScalingEnabled bool
// ExcludeOverhead controls if pod overhead is excluded from the calculation.
ExcludeOverhead bool
// ContainerFn is called with the effective resources required for each container within the pod.
ContainerFn func(res v1.ResourceList, containerType podutil.ContainerType)
// NonMissingContainerRequests if provided will replace any missing container level requests for the specified resources
// with the given values. If the requests for those resources are explicitly set, even if zero, they will not be modified.
NonMissingContainerRequests v1.ResourceList
} }
// PodRequestsAndLimitsWithoutOverhead will create a dictionary of all defined resources summed up for all // PodRequests computes the pod requests per the PodResourcesOptions supplied. If PodResourcesOptions is nil, then
// containers of the pod. // the requests are returned including pod overhead. The computation is part of the API and must be reviewed
func PodRequestsAndLimitsWithoutOverhead(pod *v1.Pod) (reqs, limits v1.ResourceList) { // as an API change.
reqs = make(v1.ResourceList, 4) func PodRequests(pod *v1.Pod, opts PodResourcesOptions) v1.ResourceList {
limits = make(v1.ResourceList, 4) // attempt to reuse the maps if passed, or allocate otherwise
podRequestsAndLimitsWithoutOverhead(pod, reqs, limits) reqs := reuseOrClearResourceList(opts.Reuse)
return reqs, limits var containerStatuses map[string]*v1.ContainerStatus
if opts.InPlacePodVerticalScalingEnabled {
containerStatuses = map[string]*v1.ContainerStatus{}
for i := range pod.Status.ContainerStatuses {
containerStatuses[pod.Status.ContainerStatuses[i].Name] = &pod.Status.ContainerStatuses[i]
}
} }
func podRequestsAndLimitsWithoutOverhead(pod *v1.Pod, reqs, limits v1.ResourceList) {
for _, container := range pod.Spec.Containers { for _, container := range pod.Spec.Containers {
addResourceList(reqs, container.Resources.Requests) containerReqs := container.Resources.Requests
if opts.InPlacePodVerticalScalingEnabled {
cs, found := containerStatuses[container.Name]
if found {
if pod.Status.Resize == v1.PodResizeStatusInfeasible {
containerReqs = cs.ResourcesAllocated
} else {
containerReqs = max(container.Resources.Requests, cs.ResourcesAllocated)
}
}
}
if len(opts.NonMissingContainerRequests) > 0 {
containerReqs = applyNonMissing(containerReqs, opts.NonMissingContainerRequests)
}
if opts.ContainerFn != nil {
opts.ContainerFn(containerReqs, podutil.Containers)
}
addResourceList(reqs, containerReqs)
}
// init containers define the minimum of any resource
// Note: In-place resize is not allowed for InitContainers, so no need to check for ResizeStatus value
for _, container := range pod.Spec.InitContainers {
containerReqs := container.Resources.Requests
if len(opts.NonMissingContainerRequests) > 0 {
containerReqs = applyNonMissing(containerReqs, opts.NonMissingContainerRequests)
}
if opts.ContainerFn != nil {
opts.ContainerFn(containerReqs, podutil.InitContainers)
}
maxResourceList(reqs, containerReqs)
}
// Add overhead for running a pod to the sum of requests if requested:
if !opts.ExcludeOverhead && pod.Spec.Overhead != nil {
addResourceList(reqs, pod.Spec.Overhead)
}
return reqs
}
// applyNonMissing will return a copy of the given resource list with any missing values replaced by the nonMissing values
func applyNonMissing(reqs v1.ResourceList, nonMissing v1.ResourceList) v1.ResourceList {
cp := v1.ResourceList{}
for k, v := range reqs {
cp[k] = v.DeepCopy()
}
for k, v := range nonMissing {
if _, found := reqs[k]; !found {
rk := cp[k]
rk.Add(v)
cp[k] = rk
}
}
return cp
}
// PodLimits computes the pod limits per the PodResourcesOptions supplied. If PodResourcesOptions is nil, then
// the limits are returned including pod overhead for any non-zero limits. The computation is part of the API and must be reviewed
// as an API change.
func PodLimits(pod *v1.Pod, opts PodResourcesOptions) v1.ResourceList {
// attempt to reuse the maps if passed, or allocate otherwise
limits := reuseOrClearResourceList(opts.Reuse)
for _, container := range pod.Spec.Containers {
if opts.ContainerFn != nil {
opts.ContainerFn(container.Resources.Limits, podutil.Containers)
}
addResourceList(limits, container.Resources.Limits) addResourceList(limits, container.Resources.Limits)
} }
// init containers define the minimum of any resource // init containers define the minimum of any resource
for _, container := range pod.Spec.InitContainers { for _, container := range pod.Spec.InitContainers {
maxResourceList(reqs, container.Resources.Requests) if opts.ContainerFn != nil {
opts.ContainerFn(container.Resources.Limits, podutil.InitContainers)
}
maxResourceList(limits, container.Resources.Limits) maxResourceList(limits, container.Resources.Limits)
} }
}
// PodRequestsAndLimitsReuse returns a dictionary of all defined resources summed up for all
// containers of the pod. Pod overhead is added to the
// total container resource requests and to the total container limits which have a
// non-zero quantity. The caller may avoid allocations of resource lists by passing
// a requests and limits list to the function, which will be cleared before use.
func PodRequestsAndLimitsReuse(pod *v1.Pod, reuseReqs, reuseLimits v1.ResourceList) (reqs, limits v1.ResourceList) {
// attempt to reuse the maps if passed, or allocate otherwise
reqs, limits = reuseOrClearResourceList(reuseReqs), reuseOrClearResourceList(reuseLimits)
podRequestsAndLimitsWithoutOverhead(pod, reqs, limits)
// Add overhead for running a pod
// to the sum of requests and to non-zero limits:
if pod.Spec.Overhead != nil {
addResourceList(reqs, pod.Spec.Overhead)
// Add overhead to non-zero limits if requested:
if !opts.ExcludeOverhead && pod.Spec.Overhead != nil {
for name, quantity := range pod.Spec.Overhead { for name, quantity := range pod.Spec.Overhead {
if value, ok := limits[name]; ok && !value.IsZero() { if value, ok := limits[name]; ok && !value.IsZero() {
value.Add(quantity) value.Add(quantity)
@ -80,19 +153,7 @@ func PodRequestsAndLimitsReuse(pod *v1.Pod, reuseReqs, reuseLimits v1.ResourceLi
} }
} }
return return limits
}
// reuseOrClearResourceList is a helper for avoiding excessive allocations of
// resource lists within the inner loop of resource calculations.
func reuseOrClearResourceList(reuse v1.ResourceList) v1.ResourceList {
if reuse == nil {
return make(v1.ResourceList, 4)
}
for k := range reuse {
delete(reuse, k)
}
return reuse
} }
// addResourceList adds the resources in newList to list. // addResourceList adds the resources in newList to list.
@ -116,6 +177,39 @@ func maxResourceList(list, newList v1.ResourceList) {
} }
} }
// max returns the result of max(a, b) for each named resource and is only used if we can't
// accumulate into an existing resource list
func max(a v1.ResourceList, b v1.ResourceList) v1.ResourceList {
result := v1.ResourceList{}
for key, value := range a {
if other, found := b[key]; found {
if value.Cmp(other) <= 0 {
result[key] = other.DeepCopy()
continue
}
}
result[key] = value.DeepCopy()
}
for key, value := range b {
if _, found := result[key]; !found {
result[key] = value.DeepCopy()
}
}
return result
}
// reuseOrClearResourceList is a helper for avoiding excessive allocations of
// resource lists within the inner loop of resource calculations.
func reuseOrClearResourceList(reuse v1.ResourceList) v1.ResourceList {
if reuse == nil {
return make(v1.ResourceList, 4)
}
for k := range reuse {
delete(reuse, k)
}
return reuse
}
// GetResourceRequestQuantity finds and returns the request quantity for a specific resource. // GetResourceRequestQuantity finds and returns the request quantity for a specific resource.
func GetResourceRequestQuantity(pod *v1.Pod, resourceName v1.ResourceName) resource.Quantity { func GetResourceRequestQuantity(pod *v1.Pod, resourceName v1.ResourceName) resource.Quantity {
requestQuantity := resource.Quantity{} requestQuantity := resource.Quantity{}

View File

@ -325,7 +325,8 @@ func TestPodRequestsAndLimits(t *testing.T) {
}, },
} }
for idx, tc := range cases { for idx, tc := range cases {
resRequests, resLimits := PodRequestsAndLimits(tc.pod) resRequests := PodRequests(tc.pod, PodResourcesOptions{})
resLimits := PodLimits(tc.pod, PodResourcesOptions{})
if !equality.Semantic.DeepEqual(tc.expectedRequests, resRequests) { if !equality.Semantic.DeepEqual(tc.expectedRequests, resRequests) {
t.Errorf("test case failure[%d]: %v, requests:\n expected:\t%v\ngot\t\t%v", idx, tc.cName, tc.expectedRequests, resRequests) t.Errorf("test case failure[%d]: %v, requests:\n expected:\t%v\ngot\t\t%v", idx, tc.cName, tc.expectedRequests, resRequests)
@ -511,7 +512,8 @@ func TestPodRequestsAndLimitsWithoutOverhead(t *testing.T) {
}, },
} }
for idx, tc := range cases { for idx, tc := range cases {
resRequests, resLimits := PodRequestsAndLimitsWithoutOverhead(tc.pod) resRequests := PodRequests(tc.pod, PodResourcesOptions{ExcludeOverhead: true})
resLimits := PodLimits(tc.pod, PodResourcesOptions{ExcludeOverhead: true})
if !equality.Semantic.DeepEqual(tc.expectedRequests, resRequests) { if !equality.Semantic.DeepEqual(tc.expectedRequests, resRequests) {
t.Errorf("test case failure[%d]: %v, requests:\n expected:\t%v\ngot\t\t%v", idx, tc.name, tc.expectedRequests, resRequests) t.Errorf("test case failure[%d]: %v, requests:\n expected:\t%v\ngot\t\t%v", idx, tc.name, tc.expectedRequests, resRequests)
@ -572,3 +574,444 @@ func getPod(cname string, resources podResources) *v1.Pod {
}, },
} }
} }
func TestPodResourceRequests(t *testing.T) {
testCases := []struct {
description string
options PodResourcesOptions
overhead v1.ResourceList
podResizeStatus v1.PodResizeStatus
initContainers []v1.Container
containers []v1.Container
containerStatus []v1.ContainerStatus
expectedRequests v1.ResourceList
}{
{
description: "nil options, larger init container",
expectedRequests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("4"),
},
initContainers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("4"),
},
},
},
},
containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1"),
},
},
},
},
},
{
description: "nil options, larger containers",
expectedRequests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("5"),
},
initContainers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2"),
},
},
},
},
containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2"),
},
},
},
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("3"),
},
},
},
},
},
{
description: "pod overhead excluded",
expectedRequests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("5"),
},
options: PodResourcesOptions{
ExcludeOverhead: true,
},
overhead: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1"),
},
initContainers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2"),
},
},
},
},
containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2"),
},
},
},
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("3"),
},
},
},
},
},
{
description: "pod overhead included",
expectedRequests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("6"),
v1.ResourceMemory: resource.MustParse("1Gi"),
},
overhead: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1"),
v1.ResourceMemory: resource.MustParse("1Gi"),
},
initContainers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2"),
},
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2"),
},
},
},
},
containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2"),
},
},
},
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("3"),
},
},
},
},
},
{
description: "resized, infeasible",
expectedRequests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2"),
},
podResizeStatus: v1.PodResizeStatusInfeasible,
options: PodResourcesOptions{InPlacePodVerticalScalingEnabled: true},
containers: []v1.Container{
{
Name: "container-1",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("4"),
},
},
},
},
containerStatus: []v1.ContainerStatus{
{
Name: "container-1",
ResourcesAllocated: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2"),
},
},
},
},
{
description: "resized, no resize status",
expectedRequests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("4"),
},
options: PodResourcesOptions{InPlacePodVerticalScalingEnabled: true},
containers: []v1.Container{
{
Name: "container-1",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("4"),
},
},
},
},
containerStatus: []v1.ContainerStatus{
{
Name: "container-1",
ResourcesAllocated: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2"),
},
},
},
},
{
description: "resized, infeasible, feature gate disabled",
expectedRequests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("4"),
},
podResizeStatus: v1.PodResizeStatusInfeasible,
options: PodResourcesOptions{InPlacePodVerticalScalingEnabled: false},
containers: []v1.Container{
{
Name: "container-1",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("4"),
},
},
},
},
containerStatus: []v1.ContainerStatus{
{
Name: "container-1",
ResourcesAllocated: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2"),
},
},
},
},
}
for _, tc := range testCases {
p := &v1.Pod{
Spec: v1.PodSpec{
Containers: tc.containers,
InitContainers: tc.initContainers,
Overhead: tc.overhead,
},
Status: v1.PodStatus{
ContainerStatuses: tc.containerStatus,
Resize: tc.podResizeStatus,
},
}
request := PodRequests(p, tc.options)
if !resourcesEqual(tc.expectedRequests, request) {
t.Errorf("[%s] expected requests = %v, got %v", tc.description, tc.expectedRequests, request)
}
}
}
func TestPodResourceRequestsReuse(t *testing.T) {
expectedRequests := v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1"),
}
p := &v1.Pod{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: expectedRequests,
},
},
},
},
}
opts := PodResourcesOptions{
Reuse: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("25"),
},
}
requests := PodRequests(p, opts)
if !resourcesEqual(expectedRequests, requests) {
t.Errorf("expected requests = %v, got %v", expectedRequests, requests)
}
// should re-use the maps we passed in
if !resourcesEqual(expectedRequests, opts.Reuse) {
t.Errorf("expected to re-use the requests")
}
}
func TestPodResourceLimits(t *testing.T) {
testCases := []struct {
description string
options PodResourcesOptions
overhead v1.ResourceList
initContainers []v1.Container
containers []v1.Container
expectedLimits v1.ResourceList
}{
{
description: "nil options, larger init container",
expectedLimits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("4"),
},
initContainers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("4"),
},
},
},
},
containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1"),
},
},
},
},
},
{
description: "nil options, larger containers",
expectedLimits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("5"),
},
initContainers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2"),
},
},
},
},
containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2"),
},
},
},
{
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("3"),
},
},
},
},
},
{
description: "pod overhead excluded",
expectedLimits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("5"),
},
options: PodResourcesOptions{
ExcludeOverhead: true,
},
overhead: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1"),
},
initContainers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2"),
},
},
},
},
containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2"),
},
},
},
{
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("3"),
},
},
},
},
},
{
description: "pod overhead included",
overhead: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1"),
v1.ResourceMemory: resource.MustParse("1Gi"),
},
expectedLimits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("6"),
// overhead is only added to non-zero limits, so there will be no expected memory limit
},
initContainers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2"),
},
},
},
},
containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2"),
},
},
},
{
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("3"),
},
},
},
},
},
}
for _, tc := range testCases {
p := &v1.Pod{
Spec: v1.PodSpec{
Containers: tc.containers,
InitContainers: tc.initContainers,
Overhead: tc.overhead,
},
}
limits := PodLimits(p, tc.options)
if !resourcesEqual(tc.expectedLimits, limits) {
t.Errorf("[%s] expected limits = %v, got %v", tc.description, tc.expectedLimits, limits)
}
}
}
func resourcesEqual(lhs, rhs v1.ResourceList) bool {
if len(lhs) != len(rhs) {
return false
}
for name, lhsv := range lhs {
rhsv, ok := rhs[name]
if !ok {
return false
}
if !lhsv.Equal(rhsv) {
return false
}
}
return true
}

View File

@ -21,6 +21,8 @@ import (
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2" "k8s.io/klog/v2"
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
"k8s.io/kubernetes/pkg/api/v1/resource"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
) )
@ -230,71 +232,17 @@ func (m *ManagerImpl) getNUMANodeIds(topology *pluginapi.TopologyInfo) []int {
} }
func (m *ManagerImpl) getPodDeviceRequest(pod *v1.Pod) map[string]int { func (m *ManagerImpl) getPodDeviceRequest(pod *v1.Pod) map[string]int {
podResources := sets.NewString() // for these device plugin resources, requests == limits
limits := resource.PodLimits(pod, resource.PodResourcesOptions{
// Find the max request of a given resource across all init containers ExcludeOverhead: true,
initContainerRequests := make(map[string]int) })
for _, container := range pod.Spec.InitContainers {
for resourceObj, requestedObj := range container.Resources.Limits {
resource := string(resourceObj)
requested := int(requestedObj.Value())
if !m.isDevicePluginResource(resource) {
continue
}
podResources.Insert(resource)
if _, exists := initContainerRequests[resource]; !exists {
initContainerRequests[resource] = requested
continue
}
if requested > initContainerRequests[resource] {
initContainerRequests[resource] = requested
}
}
}
// Compute the sum of requests across all app containers for a given resource
appContainerRequests := make(map[string]int)
for _, container := range pod.Spec.Containers {
for resourceObj, requestedObj := range container.Resources.Limits {
resource := string(resourceObj)
requested := int(requestedObj.Value())
if !m.isDevicePluginResource(resource) {
continue
}
podResources.Insert(resource)
appContainerRequests[resource] += requested
}
}
// Calculate podRequests as the max of init and app container requests for a given resource
podRequests := make(map[string]int) podRequests := make(map[string]int)
for resource := range podResources { for resourceName, quantity := range limits {
_, initExists := initContainerRequests[resource] if !m.isDevicePluginResource(string(resourceName)) {
_, appExists := appContainerRequests[resource]
if initExists && !appExists {
podRequests[resource] = initContainerRequests[resource]
continue continue
} }
podRequests[string(resourceName)] = int(quantity.Value())
if !initExists && appExists {
podRequests[resource] = appContainerRequests[resource]
continue
} }
if initContainerRequests[resource] > appContainerRequests[resource] {
podRequests[resource] = initContainerRequests[resource]
continue
}
podRequests[resource] = appContainerRequests[resource]
}
return podRequests return podRequests
} }

View File

@ -24,10 +24,10 @@ import (
"strconv" "strconv"
libcontainercgroups "github.com/opencontainers/runc/libcontainer/cgroups" libcontainercgroups "github.com/opencontainers/runc/libcontainer/cgroups"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
podutil "k8s.io/kubernetes/pkg/api/v1/pod" podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/api/v1/resource" "k8s.io/kubernetes/pkg/api/v1/resource"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
@ -119,8 +119,28 @@ func HugePageLimits(resourceList v1.ResourceList) map[int64]int64 {
// ResourceConfigForPod takes the input pod and outputs the cgroup resource config. // ResourceConfigForPod takes the input pod and outputs the cgroup resource config.
func ResourceConfigForPod(pod *v1.Pod, enforceCPULimits bool, cpuPeriod uint64, enforceMemoryQoS bool) *ResourceConfig { func ResourceConfigForPod(pod *v1.Pod, enforceCPULimits bool, cpuPeriod uint64, enforceMemoryQoS bool) *ResourceConfig {
inPlacePodVerticalScalingEnabled := utilfeature.DefaultFeatureGate.Enabled(kubefeatures.InPlacePodVerticalScaling)
// sum requests and limits. // sum requests and limits.
reqs, limits := resource.PodRequestsAndLimits(pod) reqs := resource.PodRequests(pod, resource.PodResourcesOptions{
InPlacePodVerticalScalingEnabled: inPlacePodVerticalScalingEnabled,
})
// track if limits were applied for each resource.
memoryLimitsDeclared := true
cpuLimitsDeclared := true
limits := resource.PodLimits(pod, resource.PodResourcesOptions{
InPlacePodVerticalScalingEnabled: inPlacePodVerticalScalingEnabled,
ContainerFn: func(res v1.ResourceList, containerType podutil.ContainerType) {
if res.Cpu().IsZero() {
cpuLimitsDeclared = false
}
if res.Memory().IsZero() {
memoryLimitsDeclared = false
}
},
})
// map hugepage pagesize (bytes) to limits (bytes)
hugePageLimits := HugePageLimits(reqs)
cpuRequests := int64(0) cpuRequests := int64(0)
cpuLimits := int64(0) cpuLimits := int64(0)
@ -139,48 +159,6 @@ func ResourceConfigForPod(pod *v1.Pod, enforceCPULimits bool, cpuPeriod uint64,
cpuShares := MilliCPUToShares(cpuRequests) cpuShares := MilliCPUToShares(cpuRequests)
cpuQuota := MilliCPUToQuota(cpuLimits, int64(cpuPeriod)) cpuQuota := MilliCPUToQuota(cpuLimits, int64(cpuPeriod))
// track if limits were applied for each resource.
memoryLimitsDeclared := true
cpuLimitsDeclared := true
// map hugepage pagesize (bytes) to limits (bytes)
hugePageLimits := map[int64]int64{}
for _, container := range pod.Spec.Containers {
if container.Resources.Limits.Cpu().IsZero() {
cpuLimitsDeclared = false
}
if container.Resources.Limits.Memory().IsZero() {
memoryLimitsDeclared = false
}
containerHugePageLimits := HugePageLimits(container.Resources.Requests)
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.InPlacePodVerticalScaling) {
if cs, ok := podutil.GetContainerStatus(pod.Status.ContainerStatuses, container.Name); ok {
containerHugePageLimits = HugePageLimits(cs.ResourcesAllocated)
}
}
for k, v := range containerHugePageLimits {
if value, exists := hugePageLimits[k]; exists {
hugePageLimits[k] = value + v
} else {
hugePageLimits[k] = v
}
}
}
for _, container := range pod.Spec.InitContainers {
if container.Resources.Limits.Cpu().IsZero() {
cpuLimitsDeclared = false
}
if container.Resources.Limits.Memory().IsZero() {
memoryLimitsDeclared = false
}
containerHugePageLimits := HugePageLimits(container.Resources.Requests)
for k, v := range containerHugePageLimits {
if value, exists := hugePageLimits[k]; !exists || v > value {
hugePageLimits[k] = v
}
}
}
// quota is not capped when cfs quota is disabled // quota is not capped when cfs quota is disabled
if !enforceCPULimits { if !enforceCPULimits {
cpuQuota = int64(-1) cpuQuota = int64(-1)

View File

@ -642,10 +642,26 @@ func TestHugePageLimits(t *testing.T) {
resultValue := HugePageLimits(resourceList) resultValue := HugePageLimits(resourceList)
if !reflect.DeepEqual(testcase.expected, resultValue) { if !reflect.DeepEqual(testcase.expected, resultValue) {
t.Errorf("unexpected result, expected: %v, actual: %v", testcase.expected, resultValue) t.Errorf("unexpected result for HugePageLimits(), expected: %v, actual: %v", testcase.expected, resultValue)
}
// ensure ResourceConfigForPod uses HugePageLimits correctly internally
p := v1.Pod{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: resourceList,
},
},
},
},
}
resultValuePod := ResourceConfigForPod(&p, false, 0, false)
if !reflect.DeepEqual(testcase.expected, resultValuePod.HugePageLimit) {
t.Errorf("unexpected result for ResourceConfigForPod(), expected: %v, actual: %v", testcase.expected, resultValuePod)
} }
}) })
} }
} }

View File

@ -31,6 +31,7 @@ import (
units "github.com/docker/go-units" units "github.com/docker/go-units"
libcontainercgroups "github.com/opencontainers/runc/libcontainer/cgroups" libcontainercgroups "github.com/opencontainers/runc/libcontainer/cgroups"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/api/v1/resource" "k8s.io/kubernetes/pkg/api/v1/resource"
v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos" v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
kubefeatures "k8s.io/kubernetes/pkg/features" kubefeatures "k8s.io/kubernetes/pkg/features"
@ -169,6 +170,7 @@ func (m *qosContainerManagerImpl) setHugePagesConfig(configs map[v1.PodQOSClass]
func (m *qosContainerManagerImpl) setCPUCgroupConfig(configs map[v1.PodQOSClass]*CgroupConfig) error { func (m *qosContainerManagerImpl) setCPUCgroupConfig(configs map[v1.PodQOSClass]*CgroupConfig) error {
pods := m.activePods() pods := m.activePods()
burstablePodCPURequest := int64(0) burstablePodCPURequest := int64(0)
reuseReqs := make(v1.ResourceList, 4)
for i := range pods { for i := range pods {
pod := pods[i] pod := pods[i]
qosClass := v1qos.GetPodQOS(pod) qosClass := v1qos.GetPodQOS(pod)
@ -176,7 +178,7 @@ func (m *qosContainerManagerImpl) setCPUCgroupConfig(configs map[v1.PodQOSClass]
// we only care about the burstable qos tier // we only care about the burstable qos tier
continue continue
} }
req, _ := resource.PodRequestsAndLimits(pod) req := resource.PodRequests(pod, resource.PodResourcesOptions{Reuse: reuseReqs})
if request, found := req[v1.ResourceCPU]; found { if request, found := req[v1.ResourceCPU]; found {
burstablePodCPURequest += request.MilliValue() burstablePodCPURequest += request.MilliValue()
} }
@ -202,6 +204,7 @@ func (m *qosContainerManagerImpl) getQoSMemoryRequests() map[v1.PodQOSClass]int6
// Sum the pod limits for pods in each QOS class // Sum the pod limits for pods in each QOS class
pods := m.activePods() pods := m.activePods()
reuseReqs := make(v1.ResourceList, 4)
for _, pod := range pods { for _, pod := range pods {
podMemoryRequest := int64(0) podMemoryRequest := int64(0)
qosClass := v1qos.GetPodQOS(pod) qosClass := v1qos.GetPodQOS(pod)
@ -209,7 +212,7 @@ func (m *qosContainerManagerImpl) getQoSMemoryRequests() map[v1.PodQOSClass]int6
// limits are not set for Best Effort pods // limits are not set for Best Effort pods
continue continue
} }
req, _ := resource.PodRequestsAndLimits(pod) req := resource.PodRequests(pod, resource.PodResourcesOptions{Reuse: reuseReqs})
if request, found := req[v1.ResourceMemory]; found { if request, found := req[v1.ResourceMemory]; found {
podMemoryRequest += request.Value() podMemoryRequest += request.Value()
} }

View File

@ -29,10 +29,12 @@ import (
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
v1helper "k8s.io/component-helpers/scheduling/corev1" corev1helpers "k8s.io/component-helpers/scheduling/corev1"
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1" statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
"k8s.io/utils/clock"
podutil "k8s.io/kubernetes/pkg/api/v1/pod" podutil "k8s.io/kubernetes/pkg/api/v1/pod"
apiv1resource "k8s.io/kubernetes/pkg/api/v1/resource" resourcehelper "k8s.io/kubernetes/pkg/api/v1/resource"
v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos" v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
@ -40,7 +42,6 @@ import (
"k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/server/stats" "k8s.io/kubernetes/pkg/kubelet/server/stats"
kubelettypes "k8s.io/kubernetes/pkg/kubelet/types" kubelettypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/utils/clock"
) )
const ( const (
@ -161,7 +162,7 @@ func (m *managerImpl) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAd
// When node has memory pressure, check BestEffort Pod's toleration: // When node has memory pressure, check BestEffort Pod's toleration:
// admit it if tolerates memory pressure taint, fail for other tolerations, e.g. DiskPressure. // admit it if tolerates memory pressure taint, fail for other tolerations, e.g. DiskPressure.
if v1helper.TolerationsTolerateTaint(attrs.Pod.Spec.Tolerations, &v1.Taint{ if corev1helpers.TolerationsTolerateTaint(attrs.Pod.Spec.Tolerations, &v1.Taint{
Key: v1.TaintNodeMemoryPressure, Key: v1.TaintNodeMemoryPressure,
Effect: v1.TaintEffectNoSchedule, Effect: v1.TaintEffectNoSchedule,
}) { }) {
@ -517,7 +518,7 @@ func (m *managerImpl) emptyDirLimitEviction(podStats statsapi.PodStats, pod *v1.
} }
func (m *managerImpl) podEphemeralStorageLimitEviction(podStats statsapi.PodStats, pod *v1.Pod) bool { func (m *managerImpl) podEphemeralStorageLimitEviction(podStats statsapi.PodStats, pod *v1.Pod) bool {
_, podLimits := apiv1resource.PodRequestsAndLimits(pod) podLimits := resourcehelper.PodLimits(pod, resourcehelper.PodResourcesOptions{})
_, found := podLimits[v1.ResourceEphemeralStorage] _, found := podLimits[v1.ResourceEphemeralStorage]
if !found { if !found {
return false return false

View File

@ -23,6 +23,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
resourcehelper "k8s.io/kubernetes/pkg/api/v1/resource" resourcehelper "k8s.io/kubernetes/pkg/api/v1/resource"
) )
@ -41,7 +42,11 @@ func (m *kubeGenericRuntimeManager) convertOverheadToLinuxResources(pod *v1.Pod)
} }
func (m *kubeGenericRuntimeManager) calculateSandboxResources(pod *v1.Pod) *runtimeapi.LinuxContainerResources { func (m *kubeGenericRuntimeManager) calculateSandboxResources(pod *v1.Pod) *runtimeapi.LinuxContainerResources {
req, lim := resourcehelper.PodRequestsAndLimitsWithoutOverhead(pod) opts := resourcehelper.PodResourcesOptions{
ExcludeOverhead: true,
}
req := resourcehelper.PodRequests(pod, opts)
lim := resourcehelper.PodLimits(pod, opts)
var cpuRequest *resource.Quantity var cpuRequest *resource.Quantity
if _, cpuRequestExists := req[v1.ResourceCPU]; cpuRequestExists { if _, cpuRequestExists := req[v1.ResourceCPU]; cpuRequestExists {
cpuRequest = req.Cpu() cpuRequest = req.Cpu()

View File

@ -31,7 +31,8 @@ import (
"k8s.io/apiserver/pkg/util/feature" "k8s.io/apiserver/pkg/util/feature"
"k8s.io/component-base/metrics" "k8s.io/component-base/metrics"
"k8s.io/klog/v2" "k8s.io/klog/v2"
apiv1resource "k8s.io/kubernetes/pkg/api/v1/resource"
resourcehelper "k8s.io/kubernetes/pkg/api/v1/resource"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util" "k8s.io/kubernetes/pkg/volume/util"
@ -296,7 +297,7 @@ func (dsw *desiredStateOfWorld) AddPodToVolume(
var sizeLimit *resource.Quantity var sizeLimit *resource.Quantity
if volumeSpec.Volume != nil { if volumeSpec.Volume != nil {
if util.IsLocalEphemeralVolume(*volumeSpec.Volume) { if util.IsLocalEphemeralVolume(*volumeSpec.Volume) {
_, podLimits := apiv1resource.PodRequestsAndLimits(pod) podLimits := resourcehelper.PodLimits(pod, resourcehelper.PodResourcesOptions{})
ephemeralStorageLimit := podLimits[v1.ResourceEphemeralStorage] ephemeralStorageLimit := podLimits[v1.ResourceEphemeralStorage]
sizeLimit = resource.NewQuantity(ephemeralStorageLimit.Value(), resource.BinarySI) sizeLimit = resource.NewQuantity(ephemeralStorageLimit.Value(), resource.BinarySI)
if volumeSpec.Volume.EmptyDir != nil && if volumeSpec.Volume.EmptyDir != nil &&

View File

@ -31,13 +31,14 @@ import (
quota "k8s.io/apiserver/pkg/quota/v1" quota "k8s.io/apiserver/pkg/quota/v1"
"k8s.io/apiserver/pkg/quota/v1/generic" "k8s.io/apiserver/pkg/quota/v1/generic"
"k8s.io/apiserver/pkg/util/feature" "k8s.io/apiserver/pkg/util/feature"
podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/utils/clock"
resourcehelper "k8s.io/kubernetes/pkg/api/v1/resource"
api "k8s.io/kubernetes/pkg/apis/core" api "k8s.io/kubernetes/pkg/apis/core"
k8s_api_v1 "k8s.io/kubernetes/pkg/apis/core/v1" k8s_api_v1 "k8s.io/kubernetes/pkg/apis/core/v1"
"k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/apis/core/v1/helper/qos" "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
"k8s.io/utils/clock"
) )
// the name used for object count quota // the name used for object count quota
@ -358,30 +359,11 @@ func PodUsageFunc(obj runtime.Object, clock clock.Clock) (corev1.ResourceList, e
return result, nil return result, nil
} }
requests := corev1.ResourceList{} opts := resourcehelper.PodResourcesOptions{
limits := corev1.ResourceList{} InPlacePodVerticalScalingEnabled: feature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling),
// TODO: ideally, we have pod level requests and limits in the future.
for i := range pod.Spec.Containers {
containerRequests := pod.Spec.Containers[i].Resources.Requests
if feature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
cs, ok := podutil.GetContainerStatus(pod.Status.ContainerStatuses, pod.Spec.Containers[i].Name)
if ok && cs.ResourcesAllocated != nil {
containerRequests = quota.Max(containerRequests, cs.ResourcesAllocated)
} }
} requests := resourcehelper.PodRequests(pod, opts)
requests = quota.Add(requests, containerRequests) limits := resourcehelper.PodLimits(pod, opts)
limits = quota.Add(limits, pod.Spec.Containers[i].Resources.Limits)
}
// InitContainers are run sequentially before other containers start, so the highest
// init container resource is compared against the sum of app containers to determine
// the effective usage for both requests and limits.
for i := range pod.Spec.InitContainers {
requests = quota.Max(requests, pod.Spec.InitContainers[i].Resources.Requests)
limits = quota.Max(limits, pod.Spec.InitContainers[i].Resources.Limits)
}
requests = quota.Add(requests, pod.Spec.Overhead)
limits = quota.Add(limits, pod.Spec.Overhead)
result = quota.Add(result, podComputeUsageHelper(requests, limits)) result = quota.Add(result, podComputeUsageHelper(requests, limits))
return result, nil return result, nil

View File

@ -24,6 +24,8 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/api/v1/resource"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/apis/config/validation" "k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
@ -160,20 +162,10 @@ func NewFit(plArgs runtime.Object, h framework.Handle, fts feature.Features) (fr
// //
// Result: CPU: 3, Memory: 3G // Result: CPU: 3, Memory: 3G
func computePodResourceRequest(pod *v1.Pod) *preFilterState { func computePodResourceRequest(pod *v1.Pod) *preFilterState {
// pod hasn't scheduled yet so we don't need to worry about InPlacePodVerticalScalingEnabled
reqs := resource.PodRequests(pod, resource.PodResourcesOptions{})
result := &preFilterState{} result := &preFilterState{}
for _, container := range pod.Spec.Containers { result.SetMaxResource(reqs)
result.Add(container.Resources.Requests)
}
// take max_resource(sum_pod, any_init_container)
for _, container := range pod.Spec.InitContainers {
result.SetMaxResource(container.Resources.Requests)
}
// If Overhead is being utilized, add to the total requests for the pod
if pod.Spec.Overhead != nil {
result.Add(pod.Spec.Overhead)
}
return result return result
} }

View File

@ -18,7 +18,12 @@ package noderesources
import ( import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2" "k8s.io/klog/v2"
resourcehelper "k8s.io/kubernetes/pkg/api/v1/resource"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
schedutil "k8s.io/kubernetes/pkg/scheduler/util" schedutil "k8s.io/kubernetes/pkg/scheduler/util"
@ -108,29 +113,23 @@ func (r *resourceAllocationScorer) calculateResourceAllocatableRequest(nodeInfo
// calculatePodResourceRequest returns the total non-zero requests. If Overhead is defined for the pod // calculatePodResourceRequest returns the total non-zero requests. If Overhead is defined for the pod
// the Overhead is added to the result. // the Overhead is added to the result.
// podResourceRequest = max(sum(podSpec.Containers), podSpec.InitContainers) + overHead func (r *resourceAllocationScorer) calculatePodResourceRequest(pod *v1.Pod, resourceName v1.ResourceName) int64 {
func (r *resourceAllocationScorer) calculatePodResourceRequest(pod *v1.Pod, resource v1.ResourceName) int64 {
var podRequest int64
for i := range pod.Spec.Containers {
container := &pod.Spec.Containers[i]
value := schedutil.GetRequestForResource(resource, &container.Resources.Requests, !r.useRequested)
podRequest += value
}
for i := range pod.Spec.InitContainers { opts := resourcehelper.PodResourcesOptions{
initContainer := &pod.Spec.InitContainers[i] InPlacePodVerticalScalingEnabled: utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling),
value := schedutil.GetRequestForResource(resource, &initContainer.Resources.Requests, !r.useRequested) }
if podRequest < value { if !r.useRequested {
podRequest = value opts.NonMissingContainerRequests = v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(schedutil.DefaultMilliCPURequest, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(schedutil.DefaultMemoryRequest, resource.DecimalSI),
} }
} }
// If Overhead is being utilized, add to the total requests for the pod requests := resourcehelper.PodRequests(pod, opts)
if pod.Spec.Overhead != nil {
if quantity, found := pod.Spec.Overhead[resource]; found {
podRequest += quantity.Value()
}
}
return podRequest quantity := requests[resourceName]
if resourceName == v1.ResourceCPU {
return quantity.MilliValue()
}
return quantity.Value()
} }

View File

@ -0,0 +1,240 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package noderesources
import (
"testing"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/kubernetes/pkg/scheduler/util"
)
func TestResourceAllocationScorerCalculateRequests(t *testing.T) {
const oneMi = 1048576
tests := []struct {
name string
pod v1.Pod
expected map[v1.ResourceName]int64
}{
{
name: "overhead only",
pod: v1.Pod{
Spec: v1.PodSpec{
Overhead: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1"),
v1.ResourceMemory: resource.MustParse("1Mi"),
},
},
},
expected: map[v1.ResourceName]int64{
v1.ResourceCPU: 1000,
v1.ResourceMemory: oneMi,
},
},
{
name: "1x requestless container",
pod: v1.Pod{
Spec: v1.PodSpec{
Containers: []v1.Container{
{},
},
},
},
expected: map[v1.ResourceName]int64{
v1.ResourceCPU: util.DefaultMilliCPURequest,
v1.ResourceMemory: util.DefaultMemoryRequest,
},
},
{
name: "2x requestless container",
pod: v1.Pod{
Spec: v1.PodSpec{
Containers: []v1.Container{
{}, {},
},
},
},
// should accumulate once per container without a request
expected: map[v1.ResourceName]int64{
v1.ResourceCPU: 2 * util.DefaultMilliCPURequest,
v1.ResourceMemory: 2 * util.DefaultMemoryRequest,
},
},
{
name: "request container + requestless container",
pod: v1.Pod{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1"),
v1.ResourceMemory: resource.MustParse("1Mi"),
},
},
},
{},
},
},
},
expected: map[v1.ResourceName]int64{
v1.ResourceCPU: 1000 + util.DefaultMilliCPURequest,
v1.ResourceMemory: oneMi + util.DefaultMemoryRequest,
},
},
{
name: "container + requestless container + overhead",
pod: v1.Pod{
Spec: v1.PodSpec{
Overhead: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1"),
v1.ResourceMemory: resource.MustParse("1Mi"),
},
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1"),
v1.ResourceMemory: resource.MustParse("1Mi"),
},
},
},
{},
},
},
},
expected: map[v1.ResourceName]int64{
v1.ResourceCPU: 2000 + util.DefaultMilliCPURequest,
v1.ResourceMemory: 2*oneMi + util.DefaultMemoryRequest,
},
},
{
name: "init container + container + requestless container + overhead",
pod: v1.Pod{
Spec: v1.PodSpec{
Overhead: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1"),
v1.ResourceMemory: resource.MustParse("1Mi"),
},
InitContainers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("3"),
},
},
},
},
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1"),
v1.ResourceMemory: resource.MustParse("1Mi"),
},
},
},
{},
},
},
},
expected: map[v1.ResourceName]int64{
v1.ResourceCPU: 4000,
v1.ResourceMemory: 2*oneMi + util.DefaultMemoryRequest,
},
},
{
name: "requestless init container + small init container + small container ",
pod: v1.Pod{
Spec: v1.PodSpec{
InitContainers: []v1.Container{
{},
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1m"),
v1.ResourceMemory: resource.MustParse("1"),
},
},
},
},
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("3m"),
v1.ResourceMemory: resource.MustParse("3"),
},
},
},
},
},
},
expected: map[v1.ResourceName]int64{
v1.ResourceCPU: util.DefaultMilliCPURequest,
v1.ResourceMemory: util.DefaultMemoryRequest,
},
},
{
name: "requestless init container + small init container + small container + requestless container ",
pod: v1.Pod{
Spec: v1.PodSpec{
InitContainers: []v1.Container{
{},
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1m"),
v1.ResourceMemory: resource.MustParse("1"),
},
},
},
},
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("3m"),
v1.ResourceMemory: resource.MustParse("3"),
},
},
},
{},
},
},
},
expected: map[v1.ResourceName]int64{
v1.ResourceCPU: 3 + util.DefaultMilliCPURequest,
v1.ResourceMemory: 3 + util.DefaultMemoryRequest,
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
var scorer resourceAllocationScorer
for n, exp := range tc.expected {
got := scorer.calculatePodResourceRequest(&tc.pod, n)
if got != exp {
t.Errorf("expected %s = %d, got %d", n, exp, got)
}
}
})
}
}

View File

@ -29,10 +29,11 @@ import (
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
utilerrors "k8s.io/apimachinery/pkg/util/errors" utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
quota "k8s.io/apiserver/pkg/quota/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2" "k8s.io/klog/v2"
podutil "k8s.io/kubernetes/pkg/api/v1/pod" podutil "k8s.io/kubernetes/pkg/api/v1/pod"
resourcehelper "k8s.io/kubernetes/pkg/api/v1/resource"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
schedutil "k8s.io/kubernetes/pkg/scheduler/util" schedutil "k8s.io/kubernetes/pkg/scheduler/util"
) )
@ -726,40 +727,30 @@ func max(a, b int64) int64 {
return b return b
} }
// resourceRequest = max(sum(podSpec.Containers), podSpec.InitContainers) + overHead func calculateResource(pod *v1.Pod) (Resource, int64, int64) {
func calculateResource(pod *v1.Pod) (res Resource, non0CPU int64, non0Mem int64) { var non0InitCPU, non0InitMem int64
inPlacePodVerticalScalingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) var non0CPU, non0Mem int64
resPtr := &res requests := resourcehelper.PodRequests(pod, resourcehelper.PodResourcesOptions{
for _, c := range pod.Spec.Containers { InPlacePodVerticalScalingEnabled: utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling),
req := c.Resources.Requests ContainerFn: func(requests v1.ResourceList, containerType podutil.ContainerType) {
if inPlacePodVerticalScalingEnabled { non0CPUReq, non0MemReq := schedutil.GetNonzeroRequests(&requests)
cs, found := podutil.GetContainerStatus(pod.Status.ContainerStatuses, c.Name) switch containerType {
if found { case podutil.Containers:
if pod.Status.Resize == v1.PodResizeStatusInfeasible {
req = cs.ResourcesAllocated
} else {
req = quota.Max(c.Resources.Requests, cs.ResourcesAllocated)
}
}
}
resPtr.Add(req)
non0CPUReq, non0MemReq := schedutil.GetNonzeroRequests(&req)
non0CPU += non0CPUReq non0CPU += non0CPUReq
non0Mem += non0MemReq non0Mem += non0MemReq
// No non-zero resources for GPUs or opaque resources. case podutil.InitContainers:
non0InitCPU = max(non0InitCPU, non0CPUReq)
non0InitMem = max(non0InitMem, non0MemReq)
} }
},
})
// Note: In-place resize is not allowed for InitContainers, so no need to check for ResizeStatus value non0CPU = max(non0CPU, non0InitCPU)
for _, ic := range pod.Spec.InitContainers { non0Mem = max(non0Mem, non0InitMem)
resPtr.SetMaxResource(ic.Resources.Requests)
non0CPUReq, non0MemReq := schedutil.GetNonzeroRequests(&ic.Resources.Requests)
non0CPU = max(non0CPU, non0CPUReq)
non0Mem = max(non0Mem, non0MemReq)
}
// If Overhead is being utilized, add to the total requests for the pod // If Overhead is being utilized, add to the non-zero cpu/memory tracking for the pod. It has already been added
// into ScalarResources since it is part of requests
if pod.Spec.Overhead != nil { if pod.Spec.Overhead != nil {
resPtr.Add(pod.Spec.Overhead)
if _, found := pod.Spec.Overhead[v1.ResourceCPU]; found { if _, found := pod.Spec.Overhead[v1.ResourceCPU]; found {
non0CPU += pod.Spec.Overhead.Cpu().MilliValue() non0CPU += pod.Spec.Overhead.Cpu().MilliValue()
} }
@ -768,8 +759,9 @@ func calculateResource(pod *v1.Pod) (res Resource, non0CPU int64, non0Mem int64)
non0Mem += pod.Spec.Overhead.Memory().Value() non0Mem += pod.Spec.Overhead.Memory().Value()
} }
} }
var res Resource
return res.Add(requests)
return res, non0CPU, non0Mem
} }
// updateUsedPorts updates the UsedPorts of NodeInfo. // updateUsedPorts updates the UsedPorts of NodeInfo.

View File

@ -196,6 +196,7 @@ func podRequestsAndLimitsByLifecycle(pod *v1.Pod, reuseReqs, reuseLimits v1.Reso
return return
} }
reqs, limits = v1resource.PodRequestsAndLimitsReuse(pod, reuseReqs, reuseLimits) reqs = v1resource.PodRequests(pod, v1resource.PodResourcesOptions{Reuse: reuseReqs})
limits = v1resource.PodLimits(pod, v1resource.PodResourcesOptions{Reuse: reuseLimits})
return return
} }

View File

@ -18,6 +18,7 @@ package util
import ( import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
) )
// For each of these resources, a container that doesn't request the resource explicitly // For each of these resources, a container that doesn't request the resource explicitly
@ -35,44 +36,46 @@ const (
DefaultMemoryRequest int64 = 200 * 1024 * 1024 // 200 MB DefaultMemoryRequest int64 = 200 * 1024 * 1024 // 200 MB
) )
// GetNonzeroRequests returns the default cpu and memory resource request if none is found or // GetNonzeroRequests returns the default cpu in milli-cpu and memory in bytes resource requests if none is found or
// what is provided on the request. // what is provided on the request.
func GetNonzeroRequests(requests *v1.ResourceList) (int64, int64) { func GetNonzeroRequests(requests *v1.ResourceList) (int64, int64) {
return GetRequestForResource(v1.ResourceCPU, requests, true), cpu := GetRequestForResource(v1.ResourceCPU, requests, true)
GetRequestForResource(v1.ResourceMemory, requests, true) mem := GetRequestForResource(v1.ResourceMemory, requests, true)
return cpu.MilliValue(), mem.Value()
} }
// GetRequestForResource returns the requested values unless nonZero is true and there is no defined request // GetRequestForResource returns the requested values unless nonZero is true and there is no defined request
// for CPU and memory. // for CPU and memory.
// If nonZero is true and the resource has no defined request for CPU or memory, it returns a default value. // If nonZero is true and the resource has no defined request for CPU or memory, it returns a default value.
func GetRequestForResource(resource v1.ResourceName, requests *v1.ResourceList, nonZero bool) int64 { func GetRequestForResource(resourceName v1.ResourceName, requests *v1.ResourceList, nonZero bool) resource.Quantity {
if requests == nil { if requests == nil {
return 0 return resource.Quantity{}
} }
switch resource { switch resourceName {
case v1.ResourceCPU: case v1.ResourceCPU:
// Override if un-set, but not if explicitly set to zero // Override if un-set, but not if explicitly set to zero
if _, found := (*requests)[v1.ResourceCPU]; !found && nonZero { if _, found := (*requests)[v1.ResourceCPU]; !found && nonZero {
return DefaultMilliCPURequest return *resource.NewMilliQuantity(DefaultMilliCPURequest, resource.DecimalSI)
} }
return requests.Cpu().MilliValue() return requests.Cpu().DeepCopy()
case v1.ResourceMemory: case v1.ResourceMemory:
// Override if un-set, but not if explicitly set to zero // Override if un-set, but not if explicitly set to zero
if _, found := (*requests)[v1.ResourceMemory]; !found && nonZero { if _, found := (*requests)[v1.ResourceMemory]; !found && nonZero {
return DefaultMemoryRequest return *resource.NewQuantity(DefaultMemoryRequest, resource.DecimalSI)
} }
return requests.Memory().Value() return requests.Memory().DeepCopy()
case v1.ResourceEphemeralStorage: case v1.ResourceEphemeralStorage:
quantity, found := (*requests)[v1.ResourceEphemeralStorage] quantity, found := (*requests)[v1.ResourceEphemeralStorage]
if !found { if !found {
return 0 return resource.Quantity{}
} }
return quantity.Value() return quantity.DeepCopy()
default: default:
quantity, found := (*requests)[resource] quantity, found := (*requests)[resourceName]
if !found { if !found {
return 0 return resource.Quantity{}
} }
return quantity.Value() return quantity.DeepCopy()
} }
} }

View File

@ -165,7 +165,13 @@ func TestGetRequestForResource(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
realQuantity := GetRequestForResource(test.resource, &test.requests, test.nonZero) realQuantity := GetRequestForResource(test.resource, &test.requests, test.nonZero)
assert.EqualValuesf(t, test.expectedQuantity, realQuantity, "Failed to test: %s", test.name) var realQuantityI64 int64
if test.resource == v1.ResourceCPU {
realQuantityI64 = realQuantity.MilliValue()
} else {
realQuantityI64 = realQuantity.Value()
}
assert.EqualValuesf(t, test.expectedQuantity, realQuantityI64, "Failed to test: %s", test.name)
}) })
} }
} }

View File

@ -561,6 +561,8 @@ func PodValidateLimitFunc(limitRange *corev1.LimitRange, pod *api.Pod) error {
// enforce pod limits on init containers // enforce pod limits on init containers
if limitType == corev1.LimitTypePod { if limitType == corev1.LimitTypePod {
// TODO: look into re-using resourcehelper.PodRequests/resourcehelper.PodLimits instead of duplicating
// that calculation
containerRequests, containerLimits := []api.ResourceList{}, []api.ResourceList{} containerRequests, containerLimits := []api.ResourceList{}, []api.ResourceList{}
for j := range pod.Spec.Containers { for j := range pod.Spec.Containers {
container := &pod.Spec.Containers[j] container := &pod.Spec.Containers[j]