This commit contains the following:

1. Scheduler bug-fix + scheduler-focussed E2E tests
2. Add cgroup v2 support for in-place pod resize
3. Enable full E2E pod resize test for containerd>=1.6.9 and EventedPLEG related changes.

Co-Authored-By: Vinay Kulkarni <vskibum@gmail.com>
This commit is contained in:
Chen Wang 2022-11-04 13:53:49 -07:00 committed by vinay kulkarni
parent f2bd94a0de
commit 7db339dba2
23 changed files with 636 additions and 247 deletions

View File

@ -32,6 +32,7 @@ import (
"github.com/opencontainers/runc/libcontainer/cgroups/manager" "github.com/opencontainers/runc/libcontainer/cgroups/manager"
cgroupsystemd "github.com/opencontainers/runc/libcontainer/cgroups/systemd" cgroupsystemd "github.com/opencontainers/runc/libcontainer/cgroups/systemd"
libcontainerconfigs "github.com/opencontainers/runc/libcontainer/configs" libcontainerconfigs "github.com/opencontainers/runc/libcontainer/configs"
v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2" "k8s.io/klog/v2"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
@ -48,6 +49,7 @@ const (
MemoryMin string = "memory.min" MemoryMin string = "memory.min"
// MemoryHigh is memory.high for cgroup v2 // MemoryHigh is memory.high for cgroup v2
MemoryHigh string = "memory.high" MemoryHigh string = "memory.high"
Cgroup2MaxCpuLimit string = "max"
) )
var RootCgroupName = CgroupName([]string{}) var RootCgroupName = CgroupName([]string{})
@ -559,85 +561,188 @@ func (m *cgroupManagerImpl) MemoryUsage(name CgroupName) (int64, error) {
return int64(val), err return int64(val), err
} }
// Get the memory limit in bytes applied to the cgroup // Convert cgroup v1 cpu.shares value to cgroup v2 cpu.weight
func (m *cgroupManagerImpl) GetCgroupMemoryConfig(name CgroupName) (uint64, error) { // https://github.com/kubernetes/enhancements/tree/master/keps/sig-node/2254-cgroup-v2#phase-1-convert-from-cgroups-v1-settings-to-v2
cgroupPaths := m.buildCgroupPaths(name) func CpuSharesToCpuWeight(cpuShares uint64) uint64 {
cgroupMemoryPath, found := cgroupPaths["memory"] return uint64((((cpuShares - 2) * 9999) / 262142) + 1)
if !found {
return 0, fmt.Errorf("failed to build memory cgroup fs path for cgroup %v", name)
}
memLimit, err := fscommon.GetCgroupParamUint(cgroupMemoryPath, "memory.limit_in_bytes")
if err != nil {
return 0, fmt.Errorf("failed to get memory.limit_in_bytes for cgroup %v: %v", name, err)
}
return memLimit, nil
} }
// Get the cpu quota, cpu period, and cpu shares applied to the cgroup // Convert cgroup v2 cpu.weight value to cgroup v1 cpu.shares
func (m *cgroupManagerImpl) GetCgroupCpuConfig(name CgroupName) (int64, uint64, uint64, error) { // https://github.com/kubernetes/enhancements/tree/master/keps/sig-node/2254-cgroup-v2#phase-1-convert-from-cgroups-v1-settings-to-v2
cgroupPaths := m.buildCgroupPaths(name) func CpuWeightToCpuShares(cpuWeight uint64) uint64 {
cgroupCpuPath, found := cgroupPaths["cpu"] return uint64((((cpuWeight - 1) * 262142) / 9999) + 2)
if !found { }
return 0, 0, 0, fmt.Errorf("failed to build CPU cgroup fs path for cgroup %v", name)
} func getCgroupv1CpuConfig(cgroupPath string) (*ResourceConfig, error) {
cpuQuotaStr, errQ := fscommon.GetCgroupParamString(cgroupCpuPath, "cpu.cfs_quota_us") cpuQuotaStr, errQ := fscommon.GetCgroupParamString(cgroupPath, "cpu.cfs_quota_us")
if errQ != nil { if errQ != nil {
return 0, 0, 0, fmt.Errorf("failed to read CPU quota for cgroup %v: %v", name, errQ) return nil, fmt.Errorf("failed to read CPU quota for cgroup %v: %v", cgroupPath, errQ)
} }
cpuQuota, errInt := strconv.ParseInt(cpuQuotaStr, 10, 64) cpuQuota, errInt := strconv.ParseInt(cpuQuotaStr, 10, 64)
if errInt != nil { if errInt != nil {
return 0, 0, 0, fmt.Errorf("failed to convert CPU quota as integer for cgroup %v: %v", name, errInt) return nil, fmt.Errorf("failed to convert CPU quota as integer for cgroup %v: %v", cgroupPath, errInt)
} }
cpuPeriod, errP := fscommon.GetCgroupParamUint(cgroupCpuPath, "cpu.cfs_period_us") cpuPeriod, errP := fscommon.GetCgroupParamUint(cgroupPath, "cpu.cfs_period_us")
if errP != nil { if errP != nil {
return 0, 0, 0, fmt.Errorf("failed to read CPU period for cgroup %v: %v", name, errP) return nil, fmt.Errorf("failed to read CPU period for cgroup %v: %v", cgroupPath, errP)
} }
cpuShares, errS := fscommon.GetCgroupParamUint(cgroupCpuPath, "cpu.shares") cpuShares, errS := fscommon.GetCgroupParamUint(cgroupPath, "cpu.shares")
if errP != nil { if errS != nil {
return 0, 0, 0, fmt.Errorf("failed to read CPU shares for cgroup %v: %v", name, errS) return nil, fmt.Errorf("failed to read CPU shares for cgroup %v: %v", cgroupPath, errS)
} }
return cpuQuota, cpuPeriod, cpuShares, nil return &ResourceConfig{CPUShares: &cpuShares, CPUQuota: &cpuQuota, CPUPeriod: &cpuPeriod}, nil
} }
// Set the memory limit in bytes applied to the cgroup func getCgroupv2CpuConfig(cgroupPath string) (*ResourceConfig, error) {
func (m *cgroupManagerImpl) SetCgroupMemoryConfig(name CgroupName, memoryLimit int64) error { var cpuLimitStr, cpuPeriodStr string
cpuLimitAndPeriod, err := fscommon.GetCgroupParamString(cgroupPath, "cpu.max")
if err != nil {
return nil, fmt.Errorf("failed to read cpu.max file for cgroup %v: %v", cgroupPath, err)
}
numItems, errScan := fmt.Sscanf(cpuLimitAndPeriod, "%s %s", &cpuLimitStr, &cpuPeriodStr)
if errScan != nil || numItems != 2 {
return nil, fmt.Errorf("failed to correctly parse content of cpu.max file ('%s') for cgroup %v: %v",
cpuLimitAndPeriod, cgroupPath, errScan)
}
cpuLimit := int64(-1)
if cpuLimitStr != Cgroup2MaxCpuLimit {
cpuLimit, err = strconv.ParseInt(cpuLimitStr, 10, 64)
if err != nil {
return nil, fmt.Errorf("failed to convert CPU limit as integer for cgroup %v: %v", cgroupPath, err)
}
}
cpuPeriod, errPeriod := strconv.ParseUint(cpuPeriodStr, 10, 64)
if errPeriod != nil {
return nil, fmt.Errorf("failed to convert CPU period as integer for cgroup %v: %v", cgroupPath, errPeriod)
}
cpuWeight, errWeight := fscommon.GetCgroupParamUint(cgroupPath, "cpu.weight")
if errWeight != nil {
return nil, fmt.Errorf("failed to read CPU weight for cgroup %v: %v", cgroupPath, errWeight)
}
cpuShares := CpuWeightToCpuShares(cpuWeight)
return &ResourceConfig{CPUShares: &cpuShares, CPUQuota: &cpuLimit, CPUPeriod: &cpuPeriod}, nil
}
func getCgroupCpuConfig(cgroupPath string) (*ResourceConfig, error) {
if libcontainercgroups.IsCgroup2UnifiedMode() {
return getCgroupv2CpuConfig(cgroupPath)
} else {
return getCgroupv1CpuConfig(cgroupPath)
}
}
func getCgroupMemoryConfig(cgroupPath string) (*ResourceConfig, error) {
memLimitFile := "memory.limit_in_bytes"
if libcontainercgroups.IsCgroup2UnifiedMode() {
memLimitFile = "memory.max"
}
memLimit, err := fscommon.GetCgroupParamUint(cgroupPath, memLimitFile)
if err != nil {
return nil, fmt.Errorf("failed to read %s for cgroup %v: %v", memLimitFile, cgroupPath, err)
}
mLim := int64(memLimit)
//TODO(vinaykul,InPlacePodVerticalScaling): Add memory request support
return &ResourceConfig{Memory: &mLim}, nil
}
// Get the resource config values applied to the cgroup for specified resource type
func (m *cgroupManagerImpl) GetCgroupConfig(name CgroupName, resource v1.ResourceName) (*ResourceConfig, error) {
cgroupPaths := m.buildCgroupPaths(name) cgroupPaths := m.buildCgroupPaths(name)
cgroupMemoryPath, found := cgroupPaths["memory"] cgroupResourcePath, found := cgroupPaths[string(resource)]
if !found { if !found {
return fmt.Errorf("failed to build memory cgroup fs path for cgroup %v", name) return nil, fmt.Errorf("failed to build %v cgroup fs path for cgroup %v", resource, name)
} }
memLimit := strconv.FormatInt(memoryLimit, 10) switch resource {
if err := os.WriteFile(filepath.Join(cgroupMemoryPath, "memory.limit_in_bytes"), []byte(memLimit), 0700); err != nil { case v1.ResourceCPU:
return fmt.Errorf("failed to write %v to %v: %v", memLimit, cgroupMemoryPath, err) return getCgroupCpuConfig(cgroupResourcePath)
case v1.ResourceMemory:
return getCgroupMemoryConfig(cgroupResourcePath)
} }
return nil return nil, fmt.Errorf("unsupported resource %v for cgroup %v", resource, name)
} }
// Set the cpu quota, cpu period, and cpu shares applied to the cgroup func setCgroupv1CpuConfig(cgroupPath string, resourceConfig *ResourceConfig) error {
func (m *cgroupManagerImpl) SetCgroupCpuConfig(name CgroupName, cpuQuota *int64, cpuPeriod, cpuShares *uint64) error {
var cpuQuotaStr, cpuPeriodStr, cpuSharesStr string var cpuQuotaStr, cpuPeriodStr, cpuSharesStr string
cgroupPaths := m.buildCgroupPaths(name) if resourceConfig.CPUQuota != nil {
cgroupCpuPath, found := cgroupPaths["cpu"] cpuQuotaStr = strconv.FormatInt(*resourceConfig.CPUQuota, 10)
if !found { if err := os.WriteFile(filepath.Join(cgroupPath, "cpu.cfs_quota_us"), []byte(cpuQuotaStr), 0700); err != nil {
return fmt.Errorf("failed to build cpu cgroup fs path for cgroup %v", name) return fmt.Errorf("failed to write %v to %v: %v", cpuQuotaStr, cgroupPath, err)
}
if cpuQuota != nil {
cpuQuotaStr = strconv.FormatInt(*cpuQuota, 10)
if err := os.WriteFile(filepath.Join(cgroupCpuPath, "cpu.cfs_quota_us"), []byte(cpuQuotaStr), 0700); err != nil {
return fmt.Errorf("failed to write %v to %v: %v", cpuQuotaStr, cgroupCpuPath, err)
} }
} }
if cpuPeriod != nil { if resourceConfig.CPUPeriod != nil {
cpuPeriodStr = strconv.FormatUint(*cpuPeriod, 10) cpuPeriodStr = strconv.FormatUint(*resourceConfig.CPUPeriod, 10)
if err := os.WriteFile(filepath.Join(cgroupCpuPath, "cpu.cfs_period_us"), []byte(cpuPeriodStr), 0700); err != nil { if err := os.WriteFile(filepath.Join(cgroupPath, "cpu.cfs_period_us"), []byte(cpuPeriodStr), 0700); err != nil {
return fmt.Errorf("failed to write %v to %v: %v", cpuPeriodStr, cgroupCpuPath, err) return fmt.Errorf("failed to write %v to %v: %v", cpuPeriodStr, cgroupPath, err)
} }
} }
if cpuShares != nil { if resourceConfig.CPUShares != nil {
cpuSharesStr = strconv.FormatUint(*cpuShares, 10) cpuSharesStr = strconv.FormatUint(*resourceConfig.CPUShares, 10)
if err := os.WriteFile(filepath.Join(cgroupCpuPath, "cpu.shares"), []byte(cpuSharesStr), 0700); err != nil { if err := os.WriteFile(filepath.Join(cgroupPath, "cpu.shares"), []byte(cpuSharesStr), 0700); err != nil {
return fmt.Errorf("failed to write %v to %v: %v", cpuSharesStr, cgroupCpuPath, err) return fmt.Errorf("failed to write %v to %v: %v", cpuSharesStr, cgroupPath, err)
} }
} }
return nil return nil
} }
func setCgroupv2CpuConfig(cgroupPath string, resourceConfig *ResourceConfig) error {
if resourceConfig.CPUQuota != nil {
if resourceConfig.CPUPeriod == nil {
return fmt.Errorf("CpuPeriod must be specified in order to set CpuLimit")
}
cpuLimitStr := Cgroup2MaxCpuLimit
if *resourceConfig.CPUQuota > -1 {
cpuLimitStr = strconv.FormatInt(*resourceConfig.CPUQuota, 10)
}
cpuPeriodStr := strconv.FormatUint(*resourceConfig.CPUPeriod, 10)
cpuMaxStr := fmt.Sprintf("%s %s", cpuLimitStr, cpuPeriodStr)
if err := os.WriteFile(filepath.Join(cgroupPath, "cpu.max"), []byte(cpuMaxStr), 0700); err != nil {
return fmt.Errorf("failed to write %v to %v: %v", cpuMaxStr, cgroupPath, err)
}
}
if resourceConfig.CPUShares != nil {
cpuWeight := CpuSharesToCpuWeight(*resourceConfig.CPUShares)
cpuWeightStr := strconv.FormatUint(cpuWeight, 10)
if err := os.WriteFile(filepath.Join(cgroupPath, "cpu.weight"), []byte(cpuWeightStr), 0700); err != nil {
return fmt.Errorf("failed to write %v to %v: %v", cpuWeightStr, cgroupPath, err)
}
}
return nil
}
func setCgroupCpuConfig(cgroupPath string, resourceConfig *ResourceConfig) error {
if libcontainercgroups.IsCgroup2UnifiedMode() {
return setCgroupv2CpuConfig(cgroupPath, resourceConfig)
} else {
return setCgroupv1CpuConfig(cgroupPath, resourceConfig)
}
}
func setCgroupMemoryConfig(cgroupPath string, resourceConfig *ResourceConfig) error {
memLimitFile := "memory.limit_in_bytes"
if libcontainercgroups.IsCgroup2UnifiedMode() {
memLimitFile = "memory.max"
}
memLimit := strconv.FormatInt(*resourceConfig.Memory, 10)
if err := os.WriteFile(filepath.Join(cgroupPath, memLimitFile), []byte(memLimit), 0700); err != nil {
return fmt.Errorf("failed to write %v to %v/%v: %v", memLimit, cgroupPath, memLimitFile, err)
}
//TODO(vinaykul,InPlacePodVerticalScaling): Add memory request support
return nil
}
// Set resource config for the specified resource type on the cgroup
func (m *cgroupManagerImpl) SetCgroupConfig(name CgroupName, resource v1.ResourceName, resourceConfig *ResourceConfig) error {
cgroupPaths := m.buildCgroupPaths(name)
cgroupResourcePath, found := cgroupPaths[string(resource)]
if !found {
return fmt.Errorf("failed to build %v cgroup fs path for cgroup %v", resource, name)
}
switch resource {
case v1.ResourceCPU:
return setCgroupCpuConfig(cgroupResourcePath, resourceConfig)
case v1.ResourceMemory:
return setCgroupMemoryConfig(cgroupResourcePath, resourceConfig)
}
return nil
}

View File

@ -169,3 +169,85 @@ func TestParseSystemdToCgroupName(t *testing.T) {
} }
} }
} }
func TestCpuSharesToCpuWeight(t *testing.T) {
testCases := []struct {
cpuShares uint64
expectedCpuWeight uint64
}{
{
cpuShares: 2,
expectedCpuWeight: 1,
},
{
cpuShares: 3,
expectedCpuWeight: 1,
},
{
cpuShares: 4,
expectedCpuWeight: 1,
},
{
cpuShares: 28,
expectedCpuWeight: 1,
},
{
cpuShares: 29,
expectedCpuWeight: 2,
},
{
cpuShares: 245,
expectedCpuWeight: 10,
},
{
cpuShares: 262144,
expectedCpuWeight: 10000,
},
}
for _, testCase := range testCases {
if actual := CpuSharesToCpuWeight(testCase.cpuShares); actual != testCase.expectedCpuWeight {
t.Errorf("cpuShares: %v, expectedCpuWeight: %v, actualCpuWeight: %v",
testCase.cpuShares, testCase.expectedCpuWeight, actual)
}
}
}
func TestCpuWeightToCpuShares(t *testing.T) {
testCases := []struct {
cpuWeight uint64
expectedCpuShares uint64
}{
{
cpuWeight: 1,
expectedCpuShares: 2,
},
{
cpuWeight: 2,
expectedCpuShares: 28,
},
{
cpuWeight: 3,
expectedCpuShares: 54,
},
{
cpuWeight: 4,
expectedCpuShares: 80,
},
{
cpuWeight: 245,
expectedCpuShares: 6398,
},
{
cpuWeight: 10000,
expectedCpuShares: 262144,
},
}
for _, testCase := range testCases {
if actual := CpuWeightToCpuShares(testCase.cpuWeight); actual != testCase.expectedCpuShares {
t.Errorf("cpuWeight: %v, expectedCpuShares: %v, actualCpuShares: %v",
testCase.cpuWeight, testCase.expectedCpuShares, actual)
}
}
}

View File

@ -19,7 +19,11 @@ limitations under the License.
package cm package cm
import "errors" import (
"errors"
v1 "k8s.io/api/core/v1"
)
type unsupportedCgroupManager struct{} type unsupportedCgroupManager struct{}
@ -77,19 +81,11 @@ func (m *unsupportedCgroupManager) ReduceCPULimits(cgroupName CgroupName) error
return nil return nil
} }
func (m *unsupportedCgroupManager) GetCgroupMemoryConfig(name CgroupName) (uint64, error) { func (m *unsupportedCgroupManager) GetCgroupConfig(name CgroupName, resource v1.ResourceName) (*ResourceConfig, error) {
return 0, errNotSupported return nil, errNotSupported
} }
func (m *unsupportedCgroupManager) GetCgroupCpuConfig(name CgroupName) (int64, uint64, uint64, error) { func (m *unsupportedCgroupManager) SetCgroupConfig(name CgroupName, resource v1.ResourceName, resourceConfig *ResourceConfig) error {
return 0, 0, 0, errNotSupported
}
func (m *unsupportedCgroupManager) SetCgroupMemoryConfig(name CgroupName, memoryLimit int64) error {
return errNotSupported
}
func (m *unsupportedCgroupManager) SetCgroupCpuConfig(name CgroupName, cpuQuota *int64, cpuPeriod, cpuShares *uint64) error {
return errNotSupported return errNotSupported
} }

View File

@ -95,19 +95,11 @@ func (cm *containerManagerStub) GetDevicePluginResourceCapacity() (v1.ResourceLi
return cm.extendedPluginResources, cm.extendedPluginResources, []string{} return cm.extendedPluginResources, cm.extendedPluginResources, []string{}
} }
func (m *podContainerManagerStub) GetPodCgroupMemoryConfig(_ *v1.Pod) (uint64, error) { func (m *podContainerManagerStub) GetPodCgroupConfig(_ *v1.Pod, _ v1.ResourceName) (*ResourceConfig, error) {
return 0, nil return nil, nil
} }
func (m *podContainerManagerStub) GetPodCgroupCpuConfig(_ *v1.Pod) (int64, uint64, uint64, error) { func (m *podContainerManagerStub) SetPodCgroupConfig(_ *v1.Pod, _ v1.ResourceName, _ *ResourceConfig) error {
return 0, 0, 0, nil
}
func (m *podContainerManagerStub) SetPodCgroupMemoryConfig(_ *v1.Pod, _ int64) error {
return nil
}
func (m *podContainerManagerStub) SetPodCgroupCpuConfig(_ *v1.Pod, _ *int64, _, _ *uint64) error {
return nil return nil
} }

View File

@ -382,6 +382,10 @@ func (p *staticPolicy) guaranteedCPUs(pod *v1.Pod, container *v1.Container) int
return 0 return 0
} }
cpuQuantity := container.Resources.Requests[v1.ResourceCPU] cpuQuantity := container.Resources.Requests[v1.ResourceCPU]
// In-place pod resize feature makes Container.Resources field mutable for CPU & memory.
// ResourcesAllocated holds the value of Container.Resources.Requests when the pod was admitted.
// We should return this value because this is what kubelet agreed to allocate for the container
// and the value configured with runtime.
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
if cs, ok := podutil.GetContainerStatus(pod.Status.ContainerStatuses, container.Name); ok { if cs, ok := podutil.GetContainerStatus(pod.Status.ContainerStatuses, container.Name); ok {
cpuQuantity = cs.ResourcesAllocated[v1.ResourceCPU] cpuQuantity = cs.ResourcesAllocated[v1.ResourceCPU]

View File

@ -112,30 +112,16 @@ func (cm *FakePodContainerManager) GetPodCgroupMemoryUsage(_ *v1.Pod) (uint64, e
return 0, nil return 0, nil
} }
func (cm *FakePodContainerManager) GetPodCgroupMemoryConfig(_ *v1.Pod) (uint64, error) { func (cm *FakePodContainerManager) GetPodCgroupConfig(_ *v1.Pod, _ v1.ResourceName) (*ResourceConfig, error) {
cm.Lock() cm.Lock()
defer cm.Unlock() defer cm.Unlock()
cm.CalledFunctions = append(cm.CalledFunctions, "GetPodCgroupMemoryConfig") cm.CalledFunctions = append(cm.CalledFunctions, "GetPodCgroupConfig")
return 0, nil return nil, nil
} }
func (cm *FakePodContainerManager) GetPodCgroupCpuConfig(_ *v1.Pod) (int64, uint64, uint64, error) { func (cm *FakePodContainerManager) SetPodCgroupConfig(_ *v1.Pod, _ v1.ResourceName, _ *ResourceConfig) error {
cm.Lock() cm.Lock()
defer cm.Unlock() defer cm.Unlock()
cm.CalledFunctions = append(cm.CalledFunctions, "GetPodCgroupCpuConfig") cm.CalledFunctions = append(cm.CalledFunctions, "SetPodCgroupConfig")
return 0, 0, 0, nil
}
func (cm *FakePodContainerManager) SetPodCgroupMemoryConfig(_ *v1.Pod, _ int64) error {
cm.Lock()
defer cm.Unlock()
cm.CalledFunctions = append(cm.CalledFunctions, "SetPodCgroupMemoryConfig")
return nil
}
func (cm *FakePodContainerManager) SetPodCgroupCpuConfig(_ *v1.Pod, _ *int64, _, _ *uint64) error {
cm.Lock()
defer cm.Unlock()
cm.CalledFunctions = append(cm.CalledFunctions, "SetPodCgroupCpuConfig")
return nil return nil
} }

View File

@ -414,6 +414,10 @@ func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v
func getRequestedResources(pod *v1.Pod, container *v1.Container) (map[v1.ResourceName]uint64, error) { func getRequestedResources(pod *v1.Pod, container *v1.Container) (map[v1.ResourceName]uint64, error) {
requestedResources := map[v1.ResourceName]uint64{} requestedResources := map[v1.ResourceName]uint64{}
resources := container.Resources.Requests resources := container.Resources.Requests
// In-place pod resize feature makes Container.Resources field mutable for CPU & memory.
// ResourcesAllocated holds the value of Container.Resources.Requests when the pod was admitted.
// We should return this value because this is what kubelet agreed to allocate for the container
// and the value configured with runtime.
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
if cs, ok := podutil.GetContainerStatus(pod.Status.ContainerStatuses, container.Name); ok { if cs, ok := podutil.GetContainerStatus(pod.Status.ContainerStatuses, container.Name); ok {
resources = cs.ResourcesAllocated resources = cs.ResourcesAllocated

View File

@ -129,24 +129,14 @@ func (m *podContainerManagerImpl) GetPodCgroupMemoryUsage(pod *v1.Pod) (uint64,
return uint64(memUsage), nil return uint64(memUsage), nil
} }
func (m *podContainerManagerImpl) GetPodCgroupMemoryConfig(pod *v1.Pod) (uint64, error) { func (m *podContainerManagerImpl) GetPodCgroupConfig(pod *v1.Pod, resource v1.ResourceName) (*ResourceConfig, error) {
podCgroupName, _ := m.GetPodContainerName(pod) podCgroupName, _ := m.GetPodContainerName(pod)
return m.cgroupManager.GetCgroupMemoryConfig(podCgroupName) return m.cgroupManager.GetCgroupConfig(podCgroupName, resource)
} }
func (m *podContainerManagerImpl) GetPodCgroupCpuConfig(pod *v1.Pod) (int64, uint64, uint64, error) { func (m *podContainerManagerImpl) SetPodCgroupConfig(pod *v1.Pod, resource v1.ResourceName, resourceConfig *ResourceConfig) error {
podCgroupName, _ := m.GetPodContainerName(pod) podCgroupName, _ := m.GetPodContainerName(pod)
return m.cgroupManager.GetCgroupCpuConfig(podCgroupName) return m.cgroupManager.SetCgroupConfig(podCgroupName, resource, resourceConfig)
}
func (m *podContainerManagerImpl) SetPodCgroupMemoryConfig(pod *v1.Pod, memoryLimit int64) error {
podCgroupName, _ := m.GetPodContainerName(pod)
return m.cgroupManager.SetCgroupMemoryConfig(podCgroupName, memoryLimit)
}
func (m *podContainerManagerImpl) SetPodCgroupCpuConfig(pod *v1.Pod, cpuQuota *int64, cpuPeriod, cpuShares *uint64) error {
podCgroupName, _ := m.GetPodContainerName(pod)
return m.cgroupManager.SetCgroupCpuConfig(podCgroupName, cpuQuota, cpuPeriod, cpuShares)
} }
// Kill one process ID // Kill one process ID
@ -356,18 +346,10 @@ func (m *podContainerManagerNoop) GetPodCgroupMemoryUsage(_ *v1.Pod) (uint64, er
return 0, nil return 0, nil
} }
func (m *podContainerManagerNoop) GetPodCgroupMemoryConfig(_ *v1.Pod) (uint64, error) { func (m *podContainerManagerNoop) GetPodCgroupConfig(_ *v1.Pod, _ v1.ResourceName) (*ResourceConfig, error) {
return 0, nil return nil, nil
} }
func (m *podContainerManagerNoop) GetPodCgroupCpuConfig(_ *v1.Pod) (int64, uint64, uint64, error) { func (m *podContainerManagerNoop) SetPodCgroupConfig(_ *v1.Pod, _ v1.ResourceName, _ *ResourceConfig) error {
return 0, 0, 0, nil
}
func (m *podContainerManagerNoop) SetPodCgroupMemoryConfig(_ *v1.Pod, _ int64) error {
return nil
}
func (m *podContainerManagerNoop) SetPodCgroupCpuConfig(_ *v1.Pod, _ *int64, _, _ *uint64) error {
return nil return nil
} }

View File

@ -84,14 +84,10 @@ type CgroupManager interface {
ReduceCPULimits(cgroupName CgroupName) error ReduceCPULimits(cgroupName CgroupName) error
// MemoryUsage returns current memory usage of the specified cgroup, as read from the cgroupfs. // MemoryUsage returns current memory usage of the specified cgroup, as read from the cgroupfs.
MemoryUsage(name CgroupName) (int64, error) MemoryUsage(name CgroupName) (int64, error)
// GetCgroupMemoryConfig returns the memory limit of the specified cgroup as read from cgroup fs. // Get the resource config values applied to the cgroup for specified resource type
GetCgroupMemoryConfig(name CgroupName) (uint64, error) GetCgroupConfig(name CgroupName, resource v1.ResourceName) (*ResourceConfig, error)
// GetCgroupCpuConfig returns the cpu quota, cpu period, and cpu shares of the specified cgroup as read from cgroup fs. // Set resource config for the specified resource type on the cgroup
GetCgroupCpuConfig(name CgroupName) (int64, uint64, uint64, error) SetCgroupConfig(name CgroupName, resource v1.ResourceName, resourceConfig *ResourceConfig) error
// SetCgroupMemoryConfig sets the memory limit of the specified cgroup.
SetCgroupMemoryConfig(name CgroupName, memoryLimit int64) error
// SetCgroupCpuConfig sets the cpu quota, cpu period, and cpu shares of the specified cgroup.
SetCgroupCpuConfig(name CgroupName, cpuQuota *int64, cpuPeriod, cpuShares *uint64) error
} }
// QOSContainersInfo stores the names of containers per qos // QOSContainersInfo stores the names of containers per qos
@ -128,18 +124,12 @@ type PodContainerManager interface {
// IsPodCgroup returns true if the literal cgroupfs name corresponds to a pod // IsPodCgroup returns true if the literal cgroupfs name corresponds to a pod
IsPodCgroup(cgroupfs string) (bool, types.UID) IsPodCgroup(cgroupfs string) (bool, types.UID)
// Get value of memory.usage_in_bytes for the pod Cgroup // Get value of memory usage for the pod Cgroup
GetPodCgroupMemoryUsage(pod *v1.Pod) (uint64, error) GetPodCgroupMemoryUsage(pod *v1.Pod) (uint64, error)
// Get value of memory.limit_in_bytes for the pod Cgroup // Get the resource config values applied to the pod cgroup for specified resource type
GetPodCgroupMemoryConfig(pod *v1.Pod) (uint64, error) GetPodCgroupConfig(pod *v1.Pod, resource v1.ResourceName) (*ResourceConfig, error)
// Get values of cpu.cfs_quota_us, cpu.cfs_period_us, and cpu.shares for the pod Cgroup // Set resource config values for the specified resource type on the pod cgroup
GetPodCgroupCpuConfig(pod *v1.Pod) (int64, uint64, uint64, error) SetPodCgroupConfig(pod *v1.Pod, resource v1.ResourceName, resourceConfig *ResourceConfig) error
// Set value of memory.limit_in_bytes for the pod Cgroup
SetPodCgroupMemoryConfig(pod *v1.Pod, memoryLimit int64) error
// Set values of cpu.cfs_quota_us, cpu.cfs_period_us, and cpu.shares for the pod Cgroup
SetPodCgroupCpuConfig(pod *v1.Pod, cpuQuota *int64, cpuPeriod, cpuShares *uint64) error
} }

View File

@ -327,13 +327,13 @@ type PodStatus struct {
// ContainerResources represents the Resources allocated to the running container. // ContainerResources represents the Resources allocated to the running container.
type ContainerResources struct { type ContainerResources struct {
// CPU capacity reserved for the container (cpu.shares) // CPU capacity reserved for the container
CPURequest *resource.Quantity CPURequest *resource.Quantity
// CPU limit enforced on the container (cpu.cfs_quota_us) // CPU limit enforced on the container
CPULimit *resource.Quantity CPULimit *resource.Quantity
// Memory capaacity reserved for the container // Memory capaacity reserved for the container
MemoryRequest *resource.Quantity MemoryRequest *resource.Quantity
// Memory limit enforced on the container (memory.limit_in_bytes) // Memory limit enforced on the container
MemoryLimit *resource.Quantity MemoryLimit *resource.Quantity
} }

View File

@ -31,8 +31,11 @@ import (
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1" statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
"k8s.io/kubernetes/pkg/features"
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
) )
@ -2153,13 +2156,15 @@ func TestEvictonMessageWithResourceResize(t *testing.T) {
result, found := stats[pod] result, found := stats[pod]
return result, found return result, found
} }
threshold := []evictionapi.Threshold{}
observations := signalObservations{}
for _, enabled := range []bool{true, false} { for _, enabled := range []bool{true, false} {
t.Run(fmt.Sprintf("InPlacePodVerticalScaling enabled=%v", enabled), func(t *testing.T) { t.Run(fmt.Sprintf("InPlacePodVerticalScaling enabled=%v", enabled), func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InPlacePodVerticalScaling, enabled)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InPlacePodVerticalScaling, enabled)()
msg, _ := evictionMessage(v1.ResourceMemory, testpod, statsFn) msg, _ := evictionMessage(v1.ResourceMemory, testpod, statsFn, threshold, observations)
if enabled { if enabled {
if !strings.Contains(msg, "testcontainer was using 150Mi, which exceeds its request of 100Mi") { if !strings.Contains(msg, "testcontainer was using 150Mi, request is 100Mi") {
t.Errorf("Expected 'exceeds memory' eviction message was not found.") t.Errorf("Expected 'exceeds memory' eviction message was not found.")
} }
} else { } else {

View File

@ -1861,7 +1861,7 @@ func (kl *Kubelet) syncPod(_ context.Context, updateType kubetypes.SyncPodType,
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) && isPodResizeInProgress(pod, &apiPodStatus) { if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) && isPodResizeInProgress(pod, &apiPodStatus) {
// While resize is in progress, periodically call PLEG to update pod cache // While resize is in progress, periodically call PLEG to update pod cache
runningPod := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus) runningPod := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus)
if err := kl.pleg.UpdateCache(&runningPod, pod.UID); err != nil { if err, _ := kl.pleg.UpdateCache(&runningPod, pod.UID); err != nil {
klog.ErrorS(err, "Failed to update pod cache", "pod", klog.KObj(pod)) klog.ErrorS(err, "Failed to update pod cache", "pod", klog.KObj(pod))
return false, err return false, err
} }
@ -2534,7 +2534,7 @@ func (kl *Kubelet) canResizePod(pod *v1.Pod) (bool, *v1.Pod, v1.PodResizeStatus)
cpuRequests := resource.GetResourceRequest(pod, v1.ResourceCPU) cpuRequests := resource.GetResourceRequest(pod, v1.ResourceCPU)
memRequests := resource.GetResourceRequest(pod, v1.ResourceMemory) memRequests := resource.GetResourceRequest(pod, v1.ResourceMemory)
if cpuRequests > cpuAvailable || memRequests > memAvailable { if cpuRequests > cpuAvailable || memRequests > memAvailable {
klog.V(3).InfoS("Resize is not feasible as request exceeds allocatable node resources", "Pod", pod.Name) klog.V(3).InfoS("Resize is not feasible as request exceeds allocatable node resources", "pod", pod.Name)
return false, nil, v1.PodResizeStatusInfeasible return false, nil, v1.PodResizeStatusInfeasible
} }
@ -2549,7 +2549,7 @@ func (kl *Kubelet) canResizePod(pod *v1.Pod) (bool, *v1.Pod, v1.PodResizeStatus)
if ok, failReason, failMessage := kl.canAdmitPod(otherActivePods, pod); !ok { if ok, failReason, failMessage := kl.canAdmitPod(otherActivePods, pod); !ok {
// Log reason and return. Let the next sync iteration retry the resize // Log reason and return. Let the next sync iteration retry the resize
klog.V(3).InfoS("Resize cannot be accommodated", "Pod", pod.Name, "Reason", failReason, "Message", failMessage) klog.V(3).InfoS("Resize cannot be accommodated", "pod", pod.Name, "reason", failReason, "message", failMessage)
return false, nil, v1.PodResizeStatusDeferred return false, nil, v1.PodResizeStatusDeferred
} }

View File

@ -366,7 +366,8 @@ func (m *kubeGenericRuntimeManager) updateContainerResources(pod *v1.Pod, contai
if containerResources == nil { if containerResources == nil {
return fmt.Errorf("container %q updateContainerResources failed: cannot generate resources config", containerID.String()) return fmt.Errorf("container %q updateContainerResources failed: cannot generate resources config", containerID.String())
} }
err := m.runtimeService.UpdateContainerResources(containerID.ID, containerResources) ctx := context.Background()
err := m.runtimeService.UpdateContainerResources(ctx, containerID.ID, containerResources)
if err != nil { if err != nil {
klog.ErrorS(err, "UpdateContainerResources failed", "container", containerID.String()) klog.ErrorS(err, "UpdateContainerResources failed", "container", containerID.String())
} }

View File

@ -865,15 +865,16 @@ func TestGenerateLinuxContainerResources(t *testing.T) {
if tc.scalingFg { if tc.scalingFg {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InPlacePodVerticalScaling, true)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InPlacePodVerticalScaling, true)()
} }
tc.expected.HugepageLimits = []*runtimeapi.HugepageLimit{{PageSize: "2MB", Limit: 0}, {PageSize: "1GB", Limit: 0}}
pod.Spec.Containers[0].Resources = v1.ResourceRequirements{Limits: tc.limits, Requests: tc.requests} pod.Spec.Containers[0].Resources = v1.ResourceRequirements{Limits: tc.limits, Requests: tc.requests}
if len(tc.cStatus) > 0 { if len(tc.cStatus) > 0 {
pod.Status.ContainerStatuses = tc.cStatus pod.Status.ContainerStatuses = tc.cStatus
} }
resources := m.generateLinuxContainerResources(pod, &pod.Spec.Containers[0], false) resources := m.generateLinuxContainerResources(pod, &pod.Spec.Containers[0], false)
tc.expected.HugepageLimits = resources.HugepageLimits
if diff.ObjectDiff(resources, tc.expected) != "" { if diff.ObjectDiff(resources, tc.expected) != "" {
t.Errorf("Test %s: expected resources %+v, but got %+v", tc.name, tc.expected, resources) t.Errorf("Test %s: expected resources %+v, but got %+v", tc.name, tc.expected, resources)
} }
}) })
} }
//TODO(vinaykul,InPlacePodVerticalScaling): Add unit tests for cgroup v1 & v2
} }

View File

@ -828,7 +828,8 @@ func TestUpdateContainerResources(t *testing.T) {
_, fakeContainers := makeAndSetFakePod(t, m, fakeRuntime, pod) _, fakeContainers := makeAndSetFakePod(t, m, fakeRuntime, pod)
assert.Equal(t, len(fakeContainers), 1) assert.Equal(t, len(fakeContainers), 1)
cStatus, err := m.getPodContainerStatuses(pod.UID, pod.Name, pod.Namespace) ctx := context.Background()
cStatus, err := m.getPodContainerStatuses(ctx, pod.UID, pod.Name, pod.Namespace)
assert.NoError(t, err) assert.NoError(t, err)
containerID := cStatus[0].ID containerID := cStatus[0].ID

View File

@ -647,13 +647,15 @@ func (m *kubeGenericRuntimeManager) doPodResizeAction(pod *v1.Pod, podStatus *ku
var err error var err error
switch rName { switch rName {
case v1.ResourceCPU: case v1.ResourceCPU:
podCpuResources := &cm.ResourceConfig{CPUPeriod: podResources.CPUPeriod}
if setLimitValue == true { if setLimitValue == true {
err = pcm.SetPodCgroupCpuConfig(pod, podResources.CpuQuota, podResources.CpuPeriod, nil) podCpuResources.CPUQuota = podResources.CPUQuota
} else { } else {
err = pcm.SetPodCgroupCpuConfig(pod, nil, podResources.CpuPeriod, podResources.CpuShares) podCpuResources.CPUShares = podResources.CPUShares
} }
err = pcm.SetPodCgroupConfig(pod, rName, podCpuResources)
case v1.ResourceMemory: case v1.ResourceMemory:
err = pcm.SetPodCgroupMemoryConfig(pod, *podResources.Memory) err = pcm.SetPodCgroupConfig(pod, rName, podResources)
} }
if err != nil { if err != nil {
klog.ErrorS(err, "Failed to set cgroup config", "resource", rName, "pod", pod.Name) klog.ErrorS(err, "Failed to set cgroup config", "resource", rName, "pod", pod.Name)
@ -693,9 +695,9 @@ func (m *kubeGenericRuntimeManager) doPodResizeAction(pod *v1.Pod, podStatus *ku
return err return err
} }
if len(podContainerChanges.ContainersToUpdate[v1.ResourceMemory]) > 0 || podContainerChanges.UpdatePodResources { if len(podContainerChanges.ContainersToUpdate[v1.ResourceMemory]) > 0 || podContainerChanges.UpdatePodResources {
currentPodMemoryLimit, err := pcm.GetPodCgroupMemoryConfig(pod) currentPodMemoryConfig, err := pcm.GetPodCgroupConfig(pod, v1.ResourceMemory)
if err != nil { if err != nil {
klog.ErrorS(err, "GetPodCgroupMemoryConfig failed", "pod", pod.Name) klog.ErrorS(err, "GetPodCgroupConfig for memory failed", "pod", pod.Name)
result.Fail(err) result.Fail(err)
return return
} }
@ -710,20 +712,20 @@ func (m *kubeGenericRuntimeManager) doPodResizeAction(pod *v1.Pod, podStatus *ku
result.Fail(fmt.Errorf("Aborting attempt to set pod memory limit less than current memory usage for pod %s", pod.Name)) result.Fail(fmt.Errorf("Aborting attempt to set pod memory limit less than current memory usage for pod %s", pod.Name))
return return
} }
if errResize := resizeContainers(v1.ResourceMemory, int64(currentPodMemoryLimit), *podResources.Memory, 0, 0); errResize != nil { if errResize := resizeContainers(v1.ResourceMemory, int64(*currentPodMemoryConfig.Memory), *podResources.Memory, 0, 0); errResize != nil {
result.Fail(errResize) result.Fail(errResize)
return return
} }
} }
if len(podContainerChanges.ContainersToUpdate[v1.ResourceCPU]) > 0 || podContainerChanges.UpdatePodResources { if len(podContainerChanges.ContainersToUpdate[v1.ResourceCPU]) > 0 || podContainerChanges.UpdatePodResources {
currentPodCpuQuota, _, currentPodCPUShares, err := pcm.GetPodCgroupCpuConfig(pod) currentPodCpuConfig, err := pcm.GetPodCgroupConfig(pod, v1.ResourceCPU)
if err != nil { if err != nil {
klog.ErrorS(err, "GetPodCgroupCpuConfig failed", "pod", pod.Name) klog.ErrorS(err, "GetPodCgroupConfig for CPU failed", "pod", pod.Name)
result.Fail(err) result.Fail(err)
return return
} }
if errResize := resizeContainers(v1.ResourceCPU, currentPodCpuQuota, *podResources.CpuQuota, if errResize := resizeContainers(v1.ResourceCPU, *currentPodCpuConfig.CPUQuota, *podResources.CPUQuota,
int64(currentPodCPUShares), int64(*podResources.CpuShares)); errResize != nil { int64(*currentPodCpuConfig.CPUShares), int64(*podResources.CPUShares)); errResize != nil {
result.Fail(errResize) result.Fail(errResize)
return return
} }
@ -780,7 +782,7 @@ func (m *kubeGenericRuntimeManager) updatePodContainerResources(pod *v1.Pod, res
} }
// computePodActions checks whether the pod spec has changed and returns the changes if true. // computePodActions checks whether the pod spec has changed and returns the changes if true.
func (m *kubeGenericRuntimeManager) computePodActions(pod *v1.Pod, podStatus *kubecontainer.PodStatus) podActions { func (m *kubeGenericRuntimeManager) computePodActions(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) podActions {
klog.V(5).InfoS("Syncing Pod", "pod", klog.KObj(pod)) klog.V(5).InfoS("Syncing Pod", "pod", klog.KObj(pod))
createPodSandbox, attempt, sandboxID := runtimeutil.PodSandboxChanged(pod, podStatus) createPodSandbox, attempt, sandboxID := runtimeutil.PodSandboxChanged(pod, podStatus)
@ -873,7 +875,7 @@ func (m *kubeGenericRuntimeManager) computePodActions(pod *v1.Pod, podStatus *ku
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
changes.ContainersToUpdate = make(map[v1.ResourceName][]containerToUpdateInfo) changes.ContainersToUpdate = make(map[v1.ResourceName][]containerToUpdateInfo)
latestPodStatus, err := m.GetPodStatus(podStatus.ID, pod.Name, pod.Namespace) latestPodStatus, err := m.GetPodStatus(ctx, podStatus.ID, pod.Name, pod.Namespace)
if err == nil { if err == nil {
podStatus = latestPodStatus podStatus = latestPodStatus
} }
@ -982,7 +984,7 @@ func (m *kubeGenericRuntimeManager) computePodActions(pod *v1.Pod, podStatus *ku
// 8. Create normal containers. // 8. Create normal containers.
func (m *kubeGenericRuntimeManager) SyncPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) { func (m *kubeGenericRuntimeManager) SyncPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
// Step 1: Compute sandbox and container changes. // Step 1: Compute sandbox and container changes.
podContainerChanges := m.computePodActions(pod, podStatus) podContainerChanges := m.computePodActions(ctx, pod, podStatus)
klog.V(3).InfoS("computePodActions got for pod", "podActions", podContainerChanges, "pod", klog.KObj(pod)) klog.V(3).InfoS("computePodActions got for pod", "podActions", podContainerChanges, "pod", klog.KObj(pod))
if podContainerChanges.CreateSandbox { if podContainerChanges.CreateSandbox {
ref, err := ref.GetReference(legacyscheme.Scheme, pod) ref, err := ref.GetReference(legacyscheme.Scheme, pod)

View File

@ -1174,7 +1174,8 @@ func TestComputePodActions(t *testing.T) {
if test.mutateStatusFn != nil { if test.mutateStatusFn != nil {
test.mutateStatusFn(status) test.mutateStatusFn(status)
} }
actions := m.computePodActions(pod, status) ctx := context.Background()
actions := m.computePodActions(ctx, pod, status)
verifyActions(t, &test.actions, &actions, desc) verifyActions(t, &test.actions, &actions, desc)
if test.resetStatusFn != nil { if test.resetStatusFn != nil {
test.resetStatusFn(status) test.resetStatusFn(status)
@ -1389,7 +1390,8 @@ func TestComputePodActionsWithInitContainers(t *testing.T) {
if test.mutateStatusFn != nil { if test.mutateStatusFn != nil {
test.mutateStatusFn(status) test.mutateStatusFn(status)
} }
actions := m.computePodActions(pod, status) ctx := context.Background()
actions := m.computePodActions(ctx, pod, status)
verifyActions(t, &test.actions, &actions, desc) verifyActions(t, &test.actions, &actions, desc)
} }
} }
@ -1571,7 +1573,8 @@ func TestComputePodActionsWithInitAndEphemeralContainers(t *testing.T) {
if test.mutateStatusFn != nil { if test.mutateStatusFn != nil {
test.mutateStatusFn(status) test.mutateStatusFn(status)
} }
actions := m.computePodActions(pod, status) ctx := context.Background()
actions := m.computePodActions(ctx, pod, status)
verifyActions(t, &test.actions, &actions, desc) verifyActions(t, &test.actions, &actions, desc)
} }
} }
@ -1964,7 +1967,8 @@ func TestComputePodActionsForPodResize(t *testing.T) {
} }
} }
makeAndSetFakePod(t, m, fakeRuntime, pod) makeAndSetFakePod(t, m, fakeRuntime, pod)
status, _ := m.GetPodStatus(kps.ID, pod.Name, pod.Namespace) ctx := context.Background()
status, _ := m.GetPodStatus(ctx, kps.ID, pod.Name, pod.Namespace)
for idx := range pod.Spec.Containers { for idx := range pod.Spec.Containers {
if rcs := status.FindContainerStatusByName(pod.Spec.Containers[idx].Name); rcs != nil { if rcs := status.FindContainerStatusByName(pod.Spec.Containers[idx].Name); rcs != nil {
if csIdx, found := podutil.GetIndexOfContainerStatus(pod.Status.ContainerStatuses, pod.Spec.Containers[idx].Name); found { if csIdx, found := podutil.GetIndexOfContainerStatus(pod.Status.ContainerStatuses, pod.Spec.Containers[idx].Name); found {
@ -1982,7 +1986,7 @@ func TestComputePodActionsForPodResize(t *testing.T) {
test.mutatePodFn(pod) test.mutatePodFn(pod)
} }
expectedActions := test.getExpectedPodActionsFn(pod, status) expectedActions := test.getExpectedPodActionsFn(pod, status)
actions := m.computePodActions(pod, status) actions := m.computePodActions(ctx, pod, status)
verifyActions(t, expectedActions, &actions, desc) verifyActions(t, expectedActions, &actions, desc)
} }
} }

View File

@ -409,3 +409,7 @@ func (e *EventedPLEG) updateRunningContainerMetric(podStatus *kubecontainer.PodS
} }
} }
} }
func (e *EventedPLEG) UpdateCache(pod *kubecontainer.Pod, pid types.UID) (error, bool) {
return fmt.Errorf("not implemented"), false
}

View File

@ -482,14 +482,15 @@ func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, p
return err, g.cache.Set(pod.ID, status, err, timestamp) return err, g.cache.Set(pod.ID, status, err, timestamp)
} }
func (g *GenericPLEG) UpdateCache(pod *kubecontainer.Pod, pid types.UID) error { func (g *GenericPLEG) UpdateCache(pod *kubecontainer.Pod, pid types.UID) (error, bool) {
ctx := context.Background()
if !g.cacheEnabled() { if !g.cacheEnabled() {
return fmt.Errorf("pod cache disabled") return fmt.Errorf("pod cache disabled"), false
} }
if pod == nil { if pod == nil {
return fmt.Errorf("pod cannot be nil") return fmt.Errorf("pod cannot be nil"), false
} }
return g.updateCache(pod, pid) return g.updateCache(ctx, pod, pid)
} }
func updateEvents(eventsByPodID map[types.UID][]*PodLifecycleEvent, e *PodLifecycleEvent) { func updateEvents(eventsByPodID map[types.UID][]*PodLifecycleEvent, e *PodLifecycleEvent) {

View File

@ -69,5 +69,5 @@ type PodLifecycleEventGenerator interface {
Watch() chan *PodLifecycleEvent Watch() chan *PodLifecycleEvent
Healthy() (bool, error) Healthy() (bool, error)
Relist() Relist()
UpdateCache(*kubecontainer.Pod, types.UID) error UpdateCache(*kubecontainer.Pod, types.UID) (error, bool)
} }

View File

@ -19,6 +19,7 @@ package prober
import ( import (
"context" "context"
"fmt" "fmt"
"io/ioutil"
"net" "net"
"net/http" "net/http"
"sync" "sync"
@ -80,10 +81,16 @@ func TestTCPPortExhaustion(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(fmt.Sprintf(tt.name), func(t *testing.T) { t.Run(fmt.Sprintf(tt.name), func(t *testing.T) {
testRootDir := ""
if tempDir, err := ioutil.TempDir("", "kubelet_test."); err != nil {
t.Fatalf("can't make a temp rootdir: %v", err)
} else {
testRootDir = tempDir
}
podManager := kubepod.NewBasicPodManager(nil) podManager := kubepod.NewBasicPodManager(nil)
podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker() podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker()
m := NewManager( m := NewManager(
status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker), status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, testRootDir),
results.NewManager(), results.NewManager(),
results.NewManager(), results.NewManager(),
results.NewManager(), results.NewManager(),

View File

@ -38,10 +38,12 @@ import (
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
listersv1 "k8s.io/client-go/listers/core/v1" listersv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity"
"k8s.io/kubernetes/pkg/scheduler/internal/heap" "k8s.io/kubernetes/pkg/scheduler/internal/heap"
@ -683,14 +685,47 @@ func (p *PriorityQueue) AssignedPodAdded(pod *v1.Pod) {
p.lock.Unlock() p.lock.Unlock()
} }
// isPodResourcesResizedDown returns true if a pod CPU and/or memory resize request has been
// admitted by kubelet, is 'InProgress', and results in a net sizing down of updated resources.
// It returns false if either CPU or memory resource is net resized up, or if no resize is in progress.
func isPodResourcesResizedDown(pod *v1.Pod) bool {
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
// TODO(vinaykul,wangchen615,InPlacePodVerticalScaling): Fix this to determine when a
// pod is truly resized down (might need oldPod if we cannot determine from Status alone)
if pod.Status.Resize == v1.PodResizeStatusInProgress {
return true
}
}
return false
}
// AssignedPodUpdated is called when a bound pod is updated. Change of labels // AssignedPodUpdated is called when a bound pod is updated. Change of labels
// may make pending pods with matching affinity terms schedulable. // may make pending pods with matching affinity terms schedulable.
func (p *PriorityQueue) AssignedPodUpdated(pod *v1.Pod) { func (p *PriorityQueue) AssignedPodUpdated(pod *v1.Pod) {
p.lock.Lock() p.lock.Lock()
if isPodResourcesResizedDown(pod) {
p.moveAllToActiveOrBackoffQueue(AssignedPodUpdate, nil)
} else {
p.movePodsToActiveOrBackoffQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod), AssignedPodUpdate) p.movePodsToActiveOrBackoffQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod), AssignedPodUpdate)
}
p.lock.Unlock() p.lock.Unlock()
} }
// NOTE: this function assumes a lock has been acquired in the caller.
// moveAllToActiveOrBackoffQueue moves all pods from unschedulablePods to activeQ or backoffQ.
// This function adds all pods and then signals the condition variable to ensure that
// if Pop() is waiting for an item, it receives the signal after all the pods are in the
// queue and the head is the highest priority pod.
func (p *PriorityQueue) moveAllToActiveOrBackoffQueue(event framework.ClusterEvent, preCheck PreEnqueueCheck) {
unschedulablePods := make([]*framework.QueuedPodInfo, 0, len(p.unschedulablePods.podInfoMap))
for _, pInfo := range p.unschedulablePods.podInfoMap {
if preCheck == nil || preCheck(pInfo.Pod) {
unschedulablePods = append(unschedulablePods, pInfo)
}
}
p.movePodsToActiveOrBackoffQueue(unschedulablePods, event)
}
// MoveAllToActiveOrBackoffQueue moves all pods from unschedulablePods to activeQ or backoffQ. // MoveAllToActiveOrBackoffQueue moves all pods from unschedulablePods to activeQ or backoffQ.
// This function adds all pods and then signals the condition variable to ensure that // This function adds all pods and then signals the condition variable to ensure that
// if Pop() is waiting for an item, it receives the signal after all the pods are in the // if Pop() is waiting for an item, it receives the signal after all the pods are in the
@ -698,13 +733,7 @@ func (p *PriorityQueue) AssignedPodUpdated(pod *v1.Pod) {
func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(event framework.ClusterEvent, preCheck PreEnqueueCheck) { func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(event framework.ClusterEvent, preCheck PreEnqueueCheck) {
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()
unschedulablePods := make([]*framework.QueuedPodInfo, 0, len(p.unschedulablePods.podInfoMap)) p.moveAllToActiveOrBackoffQueue(event, preCheck)
for _, pInfo := range p.unschedulablePods.podInfoMap {
if preCheck == nil || preCheck(pInfo.Pod) {
unschedulablePods = append(unschedulablePods, pInfo)
}
}
p.movePodsToActiveOrBackoffQueue(unschedulablePods, event)
} }
// NOTE: this function assumes lock has been acquired in caller // NOTE: this function assumes lock has been acquired in caller

View File

@ -19,6 +19,7 @@ package node
import ( import (
"context" "context"
"fmt" "fmt"
"regexp"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@ -29,16 +30,19 @@ import (
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/util/diff"
utilfeature "k8s.io/apiserver/pkg/util/feature" clientset "k8s.io/client-go/kubernetes"
"k8s.io/component-base/featuregate"
podutil "k8s.io/kubernetes/pkg/api/v1/pod" podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/features" resourceapi "k8s.io/kubernetes/pkg/api/v1/resource"
kubecm "k8s.io/kubernetes/pkg/kubelet/cm" kubecm "k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod" e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
e2epodoutput "k8s.io/kubernetes/test/e2e/framework/pod/output"
imageutils "k8s.io/kubernetes/test/utils/image" imageutils "k8s.io/kubernetes/test/utils/image"
semver "github.com/blang/semver/v4"
"github.com/onsi/ginkgo/v2" "github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega" "github.com/onsi/gomega"
) )
@ -52,6 +56,7 @@ const (
Cgroupv2MemRequest string = "/sys/fs/cgroup/memory.min" Cgroupv2MemRequest string = "/sys/fs/cgroup/memory.min"
Cgroupv2CPULimit string = "/sys/fs/cgroup/cpu.max" Cgroupv2CPULimit string = "/sys/fs/cgroup/cpu.max"
Cgroupv2CPURequest string = "/sys/fs/cgroup/cpu.weight" Cgroupv2CPURequest string = "/sys/fs/cgroup/cpu.weight"
CpuPeriod string = "100000"
PollInterval time.Duration = 2 * time.Second PollInterval time.Duration = 2 * time.Second
PollTimeout time.Duration = 4 * time.Minute PollTimeout time.Duration = 4 * time.Minute
@ -74,13 +79,24 @@ type TestContainerInfo struct {
RestartCount int32 RestartCount int32
} }
func isFeatureGatePostAlpha() bool { func isInPlaceResizeSupportedByRuntime(c clientset.Interface, nodeName string) bool {
if fs, found := utilfeature.DefaultFeatureGate.DeepCopy().GetAll()[features.InPlacePodVerticalScaling]; found { //TODO(vinaykul,InPlacePodVerticalScaling): Can we optimize this?
if fs.PreRelease == featuregate.Alpha { node, err := c.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
if err != nil {
return false return false
} }
re := regexp.MustCompile("containerd://(.*)")
match := re.FindStringSubmatch(node.Status.NodeInfo.ContainerRuntimeVersion)
if len(match) != 2 {
return false
}
if ver, verr := semver.ParseTolerant(match[1]); verr == nil {
if ver.Compare(semver.MustParse("1.6.9")) < 0 {
return false
} }
return true return true
}
return false
} }
func getTestResourceInfo(tcInfo TestContainerInfo) (v1.ResourceRequirements, v1.ResourceList, []v1.ContainerResizePolicy) { func getTestResourceInfo(tcInfo TestContainerInfo) (v1.ResourceRequirements, v1.ResourceList, []v1.ContainerResizePolicy) {
@ -288,8 +304,9 @@ func verifyPodStatusResources(pod *v1.Pod, tcInfo []TestContainerInfo) {
func isPodOnCgroupv2Node(pod *v1.Pod) bool { func isPodOnCgroupv2Node(pod *v1.Pod) bool {
// Determine if pod is running on cgroupv2 or cgroupv1 node // Determine if pod is running on cgroupv2 or cgroupv1 node
//TODO(vinaykul,InPlacePodVerticalScaling): Is there a better way to determine this?
cgroupv2File := "/sys/fs/cgroup/cgroup.controllers" cgroupv2File := "/sys/fs/cgroup/cgroup.controllers"
_, err := framework.RunKubectl(pod.Namespace, "exec", pod.Name, "--", "ls", cgroupv2File) _, err := e2ekubectl.RunKubectl(pod.Namespace, "exec", pod.Name, "--", "ls", cgroupv2File)
if err == nil { if err == nil {
return true return true
} }
@ -308,9 +325,12 @@ func verifyPodContainersCgroupValues(pod *v1.Pod, tcInfo []TestContainerInfo, fl
} }
verifyCgroupValue := func(cName, cgPath, expectedCgValue string) bool { verifyCgroupValue := func(cName, cgPath, expectedCgValue string) bool {
cmd := []string{"head", "-n", "1", cgPath} cmd := []string{"head", "-n", "1", cgPath}
cgValue, err := framework.LookForStringInPodExecToContainer(pod.Namespace, pod.Name, cName, cmd, expectedCgValue, PollTimeout) framework.Logf("Namespace %s Pod %s Container %s - looking for cgroup value %s in path %s",
pod.Namespace, pod.Name, cName, expectedCgValue, cgPath)
cgValue, err := e2epodoutput.LookForStringInPodExecToContainer(pod.Namespace, pod.Name, cName, cmd, expectedCgValue, PollTimeout)
if flagError { if flagError {
framework.ExpectNoError(err, "failed to find expected cgroup value in container") framework.ExpectNoError(err, fmt.Sprintf("failed to find expected value '%s' in container cgroup '%s'",
expectedCgValue, cgPath))
} }
cgValue = strings.Trim(cgValue, "\n") cgValue = strings.Trim(cgValue, "\n")
if flagError { if flagError {
@ -342,9 +362,12 @@ func verifyPodContainersCgroupValues(pod *v1.Pod, tcInfo []TestContainerInfo, fl
cpuQuota = -1 cpuQuota = -1
} }
cpuLimitString = strconv.FormatInt(cpuQuota, 10) cpuLimitString = strconv.FormatInt(cpuQuota, 10)
if podOnCgroupv2Node && cpuLimitString == "-1" { if podOnCgroupv2Node {
if cpuLimitString == "-1" {
cpuLimitString = "max" cpuLimitString = "max"
} }
cpuLimitString = fmt.Sprintf("%s %s", cpuLimitString, CpuPeriod)
}
memLimitString = strconv.FormatInt(memLimitInBytes, 10) memLimitString = strconv.FormatInt(memLimitInBytes, 10)
if podOnCgroupv2Node && memLimitString == "0" { if podOnCgroupv2Node && memLimitString == "0" {
memLimitString = "max" memLimitString = "max"
@ -357,6 +380,10 @@ func verifyPodContainersCgroupValues(pod *v1.Pod, tcInfo []TestContainerInfo, fl
if !verifyCgroupValue(ci.Name, cgroupCPULimit, cpuLimitString) { if !verifyCgroupValue(ci.Name, cgroupCPULimit, cpuLimitString) {
return false return false
} }
if podOnCgroupv2Node {
// convert cgroup v1 cpu.shares value to cgroup v2 cpu.weight value
cpuShares = int64(1 + ((cpuShares-2)*9999)/262142)
}
if !verifyCgroupValue(ci.Name, cgroupCPURequest, strconv.FormatInt(cpuShares, 10)) { if !verifyCgroupValue(ci.Name, cgroupCPURequest, strconv.FormatInt(cpuShares, 10)) {
return false return false
} }
@ -365,7 +392,7 @@ func verifyPodContainersCgroupValues(pod *v1.Pod, tcInfo []TestContainerInfo, fl
return true return true
} }
func waitForPodResizeActuation(podClient *framework.PodClient, pod, patchedPod *v1.Pod, expectedContainers []TestContainerInfo) *v1.Pod { func waitForPodResizeActuation(c clientset.Interface, podClient *e2epod.PodClient, pod, patchedPod *v1.Pod, expectedContainers []TestContainerInfo) *v1.Pod {
waitForContainerRestart := func() error { waitForContainerRestart := func() error {
var restartContainersExpected []string var restartContainersExpected []string
@ -443,15 +470,12 @@ func waitForPodResizeActuation(podClient *framework.PodClient, pod, patchedPod *
// Wait for pod resource allocations to equal expected values after resize // Wait for pod resource allocations to equal expected values after resize
resizedPod, aErr := waitPodAllocationsEqualsExpected() resizedPod, aErr := waitPodAllocationsEqualsExpected()
framework.ExpectNoError(aErr, "failed to verify pod resource allocation values equals expected values") framework.ExpectNoError(aErr, "failed to verify pod resource allocation values equals expected values")
//TODO(vinaykul,InPlacePodVerticalScaling): Remove this check when cgroupv2 support is added
if !isPodOnCgroupv2Node(pod) {
// Wait for container cgroup values to equal expected cgroup values after resize // Wait for container cgroup values to equal expected cgroup values after resize
cErr := waitContainerCgroupValuesEqualsExpected() cErr := waitContainerCgroupValuesEqualsExpected()
framework.ExpectNoError(cErr, "failed to verify container cgroup values equals expected values") framework.ExpectNoError(cErr, "failed to verify container cgroup values equals expected values")
} //TODO(vinaykul,InPlacePodVerticalScaling): Remove this check once base-OS updates to containerd>=1.6.9
//TODO(vinaykul,InPlacePodVerticalScaling): Remove featureGatePostAlpha upon exiting Alpha.
// containerd needs to add CRI support before Beta (See Node KEP #2273) // containerd needs to add CRI support before Beta (See Node KEP #2273)
if isFeatureGatePostAlpha() { if isInPlaceResizeSupportedByRuntime(c, pod.Spec.NodeName) {
// Wait for PodSpec container resources to equal PodStatus container resources indicating resize is complete // Wait for PodSpec container resources to equal PodStatus container resources indicating resize is complete
rPod, rErr := waitPodStatusResourcesEqualSpecResources() rPod, rErr := waitPodStatusResourcesEqualSpecResources()
framework.ExpectNoError(rErr, "failed to verify pod spec resources equals pod status resources") framework.ExpectNoError(rErr, "failed to verify pod spec resources equals pod status resources")
@ -464,6 +488,10 @@ func waitForPodResizeActuation(podClient *framework.PodClient, pod, patchedPod *
func doPodResizeTests() { func doPodResizeTests() {
f := framework.NewDefaultFramework("pod-resize") f := framework.NewDefaultFramework("pod-resize")
var podClient *e2epod.PodClient
ginkgo.BeforeEach(func() {
podClient = e2epod.NewPodClient(f)
})
type testCase struct { type testCase struct {
name string name string
@ -1175,7 +1203,7 @@ func doPodResizeTests() {
for idx := range tests { for idx := range tests {
tc := tests[idx] tc := tests[idx]
ginkgo.It(tc.name, func() { ginkgo.It(tc.name, func(ctx context.Context) {
var testPod, patchedPod *v1.Pod var testPod, patchedPod *v1.Pod
var pErr error var pErr error
@ -1185,12 +1213,12 @@ func doPodResizeTests() {
testPod = makeTestPod(f.Namespace.Name, "testpod", tStamp, tc.containers) testPod = makeTestPod(f.Namespace.Name, "testpod", tStamp, tc.containers)
ginkgo.By("creating pod") ginkgo.By("creating pod")
newPod := f.PodClient().CreateSync(testPod) newPod := podClient.CreateSync(ctx, testPod)
ginkgo.By("verifying the pod is in kubernetes") ginkgo.By("verifying the pod is in kubernetes")
selector := labels.SelectorFromSet(labels.Set(map[string]string{"time": tStamp})) selector := labels.SelectorFromSet(labels.Set(map[string]string{"time": tStamp}))
options := metav1.ListOptions{LabelSelector: selector.String()} options := metav1.ListOptions{LabelSelector: selector.String()}
podList, err := f.PodClient().List(context.TODO(), options) podList, err := podClient.List(context.TODO(), options)
framework.ExpectNoError(err, "failed to query for pods") framework.ExpectNoError(err, "failed to query for pods")
gomega.Expect(len(podList.Items) == 1) gomega.Expect(len(podList.Items) == 1)
@ -1212,13 +1240,10 @@ func doPodResizeTests() {
verifyPodAllocations(patchedPod, tc.containers, true) verifyPodAllocations(patchedPod, tc.containers, true)
ginkgo.By("waiting for resize to be actuated") ginkgo.By("waiting for resize to be actuated")
resizedPod := waitForPodResizeActuation(f.PodClient(), newPod, patchedPod, tc.expected) resizedPod := waitForPodResizeActuation(f.ClientSet, podClient, newPod, patchedPod, tc.expected)
ginkgo.By("verifying pod container's cgroup values after resize") ginkgo.By("verifying pod container's cgroup values after resize")
//TODO(vinaykul,InPlacePodVerticalScaling): Remove this check when cgroupv2 support is added
if !isPodOnCgroupv2Node(resizedPod) {
verifyPodContainersCgroupValues(resizedPod, tc.expected, true) verifyPodContainersCgroupValues(resizedPod, tc.expected, true)
}
ginkgo.By("verifying pod resources after resize") ginkgo.By("verifying pod resources after resize")
verifyPodResources(resizedPod, tc.expected) verifyPodResources(resizedPod, tc.expected)
@ -1227,7 +1252,7 @@ func doPodResizeTests() {
verifyPodAllocations(resizedPod, tc.expected, true) verifyPodAllocations(resizedPod, tc.expected, true)
ginkgo.By("deleting pod") ginkgo.By("deleting pod")
err = e2epod.DeletePodWithWait(f.ClientSet, newPod) err = e2epod.DeletePodWithWait(ctx, f.ClientSet, newPod)
framework.ExpectNoError(err, "failed to delete pod") framework.ExpectNoError(err, "failed to delete pod")
}) })
} }
@ -1235,8 +1260,12 @@ func doPodResizeTests() {
func doPodResizeResourceQuotaTests() { func doPodResizeResourceQuotaTests() {
f := framework.NewDefaultFramework("pod-resize-resource-quota") f := framework.NewDefaultFramework("pod-resize-resource-quota")
var podClient *e2epod.PodClient
ginkgo.BeforeEach(func() {
podClient = e2epod.NewPodClient(f)
})
ginkgo.It("pod-resize-resource-quota-test", func() { ginkgo.It("pod-resize-resource-quota-test", func(ctx context.Context) {
resourceQuota := v1.ResourceQuota{ resourceQuota := v1.ResourceQuota{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "resize-resource-quota", Name: "resize-resource-quota",
@ -1282,13 +1311,13 @@ func doPodResizeResourceQuotaTests() {
testPod2 := makeTestPod(f.Namespace.Name, "testpod2", tStamp, containers) testPod2 := makeTestPod(f.Namespace.Name, "testpod2", tStamp, containers)
ginkgo.By("creating pods") ginkgo.By("creating pods")
newPod1 := f.PodClient().CreateSync(testPod1) newPod1 := podClient.CreateSync(ctx, testPod1)
newPod2 := f.PodClient().CreateSync(testPod2) newPod2 := podClient.CreateSync(ctx, testPod2)
ginkgo.By("verifying the pod is in kubernetes") ginkgo.By("verifying the pod is in kubernetes")
selector := labels.SelectorFromSet(labels.Set(map[string]string{"time": tStamp})) selector := labels.SelectorFromSet(labels.Set(map[string]string{"time": tStamp}))
options := metav1.ListOptions{LabelSelector: selector.String()} options := metav1.ListOptions{LabelSelector: selector.String()}
podList, listErr := f.PodClient().List(context.TODO(), options) podList, listErr := podClient.List(context.TODO(), options)
framework.ExpectNoError(listErr, "failed to query for pods") framework.ExpectNoError(listErr, "failed to query for pods")
gomega.Expect(len(podList.Items) == 2) gomega.Expect(len(podList.Items) == 2)
@ -1305,13 +1334,10 @@ func doPodResizeResourceQuotaTests() {
verifyPodAllocations(patchedPod, containers, true) verifyPodAllocations(patchedPod, containers, true)
ginkgo.By("waiting for resize to be actuated") ginkgo.By("waiting for resize to be actuated")
resizedPod := waitForPodResizeActuation(f.PodClient(), newPod1, patchedPod, expected) resizedPod := waitForPodResizeActuation(f.ClientSet, podClient, newPod1, patchedPod, expected)
ginkgo.By("verifying pod container's cgroup values after resize") ginkgo.By("verifying pod container's cgroup values after resize")
//TODO(vinaykul,InPlacePodVerticalScaling): Remove this check when cgroupv2 support is added
if !isPodOnCgroupv2Node(resizedPod) {
verifyPodContainersCgroupValues(resizedPod, expected, true) verifyPodContainersCgroupValues(resizedPod, expected, true)
}
ginkgo.By("verifying pod resources after resize") ginkgo.By("verifying pod resources after resize")
verifyPodResources(resizedPod, expected) verifyPodResources(resizedPod, expected)
@ -1319,18 +1345,6 @@ func doPodResizeResourceQuotaTests() {
ginkgo.By("verifying pod allocations after resize") ginkgo.By("verifying pod allocations after resize")
verifyPodAllocations(resizedPod, expected, true) verifyPodAllocations(resizedPod, expected, true)
ginkgo.By(fmt.Sprintf("patching pod %s for resize with CPU exceeding resource quota", resizedPod.Name))
_, pErrExceedCPU := f.ClientSet.CoreV1().Pods(resizedPod.Namespace).Patch(context.TODO(),
resizedPod.Name, types.StrategicMergePatchType, []byte(patchStringExceedCPU), metav1.PatchOptions{})
framework.ExpectError(pErrExceedCPU, "exceeded quota: %s, requested: cpu=200m, used: cpu=700m, limited: cpu=800m",
resourceQuota.Name)
ginkgo.By("verifying pod patched for resize exceeding CPU resource quota remains unchanged")
patchedPodExceedCPU, pErrEx1 := f.PodClient().Get(context.TODO(), resizedPod.Name, metav1.GetOptions{})
framework.ExpectNoError(pErrEx1, "failed to get pod post exceed CPU resize")
verifyPodResources(patchedPodExceedCPU, expected)
verifyPodAllocations(patchedPodExceedCPU, expected, true)
ginkgo.By("patching pod for resize with memory exceeding resource quota") ginkgo.By("patching pod for resize with memory exceeding resource quota")
_, pErrExceedMemory := f.ClientSet.CoreV1().Pods(resizedPod.Namespace).Patch(context.TODO(), _, pErrExceedMemory := f.ClientSet.CoreV1().Pods(resizedPod.Namespace).Patch(context.TODO(),
resizedPod.Name, types.StrategicMergePatchType, []byte(patchStringExceedMemory), metav1.PatchOptions{}) resizedPod.Name, types.StrategicMergePatchType, []byte(patchStringExceedMemory), metav1.PatchOptions{})
@ -1338,21 +1352,37 @@ func doPodResizeResourceQuotaTests() {
resourceQuota.Name) resourceQuota.Name)
ginkgo.By("verifying pod patched for resize exceeding memory resource quota remains unchanged") ginkgo.By("verifying pod patched for resize exceeding memory resource quota remains unchanged")
patchedPodExceedMemory, pErrEx2 := f.PodClient().Get(context.TODO(), resizedPod.Name, metav1.GetOptions{}) patchedPodExceedMemory, pErrEx2 := podClient.Get(context.TODO(), resizedPod.Name, metav1.GetOptions{})
framework.ExpectNoError(pErrEx2, "failed to get pod post exceed memory resize") framework.ExpectNoError(pErrEx2, "failed to get pod post exceed memory resize")
verifyPodResources(patchedPodExceedMemory, expected) verifyPodResources(patchedPodExceedMemory, expected)
verifyPodAllocations(patchedPodExceedMemory, expected, true) verifyPodAllocations(patchedPodExceedMemory, expected, true)
ginkgo.By(fmt.Sprintf("patching pod %s for resize with CPU exceeding resource quota", resizedPod.Name))
_, pErrExceedCPU := f.ClientSet.CoreV1().Pods(resizedPod.Namespace).Patch(context.TODO(),
resizedPod.Name, types.StrategicMergePatchType, []byte(patchStringExceedCPU), metav1.PatchOptions{})
framework.ExpectError(pErrExceedCPU, "exceeded quota: %s, requested: cpu=200m, used: cpu=700m, limited: cpu=800m",
resourceQuota.Name)
ginkgo.By("verifying pod patched for resize exceeding CPU resource quota remains unchanged")
patchedPodExceedCPU, pErrEx1 := podClient.Get(context.TODO(), resizedPod.Name, metav1.GetOptions{})
framework.ExpectNoError(pErrEx1, "failed to get pod post exceed CPU resize")
verifyPodResources(patchedPodExceedCPU, expected)
verifyPodAllocations(patchedPodExceedCPU, expected, true)
ginkgo.By("deleting pods") ginkgo.By("deleting pods")
delErr1 := e2epod.DeletePodWithWait(f.ClientSet, newPod1) delErr1 := e2epod.DeletePodWithWait(ctx, f.ClientSet, newPod1)
framework.ExpectNoError(delErr1, "failed to delete pod %s", newPod1.Name) framework.ExpectNoError(delErr1, "failed to delete pod %s", newPod1.Name)
delErr2 := e2epod.DeletePodWithWait(f.ClientSet, newPod2) delErr2 := e2epod.DeletePodWithWait(ctx, f.ClientSet, newPod2)
framework.ExpectNoError(delErr2, "failed to delete pod %s", newPod2.Name) framework.ExpectNoError(delErr2, "failed to delete pod %s", newPod2.Name)
}) })
} }
func doPodResizeErrorTests() { func doPodResizeErrorTests() {
f := framework.NewDefaultFramework("pod-resize-errors") f := framework.NewDefaultFramework("pod-resize-errors")
var podClient *e2epod.PodClient
ginkgo.BeforeEach(func() {
podClient = e2epod.NewPodClient(f)
})
type testCase struct { type testCase struct {
name string name string
@ -1384,7 +1414,7 @@ func doPodResizeErrorTests() {
for idx := range tests { for idx := range tests {
tc := tests[idx] tc := tests[idx]
ginkgo.It(tc.name, func() { ginkgo.It(tc.name, func(ctx context.Context) {
var testPod, patchedPod *v1.Pod var testPod, patchedPod *v1.Pod
var pErr error var pErr error
@ -1394,12 +1424,12 @@ func doPodResizeErrorTests() {
testPod = makeTestPod(f.Namespace.Name, "testpod", tStamp, tc.containers) testPod = makeTestPod(f.Namespace.Name, "testpod", tStamp, tc.containers)
ginkgo.By("creating pod") ginkgo.By("creating pod")
newPod := f.PodClient().CreateSync(testPod) newPod := podClient.CreateSync(ctx, testPod)
ginkgo.By("verifying the pod is in kubernetes") ginkgo.By("verifying the pod is in kubernetes")
selector := labels.SelectorFromSet(labels.Set(map[string]string{"time": tStamp})) selector := labels.SelectorFromSet(labels.Set(map[string]string{"time": tStamp}))
options := metav1.ListOptions{LabelSelector: selector.String()} options := metav1.ListOptions{LabelSelector: selector.String()}
podList, err := f.PodClient().List(context.TODO(), options) podList, err := podClient.List(context.TODO(), options)
framework.ExpectNoError(err, "failed to query for pods") framework.ExpectNoError(err, "failed to query for pods")
gomega.Expect(len(podList.Items) == 1) gomega.Expect(len(podList.Items) == 1)
@ -1422,10 +1452,7 @@ func doPodResizeErrorTests() {
} }
ginkgo.By("verifying pod container's cgroup values after patch") ginkgo.By("verifying pod container's cgroup values after patch")
//TODO(vinaykul,InPlacePodVerticalScaling): Remove this check when cgroupv2 support is added
if !isPodOnCgroupv2Node(patchedPod) {
verifyPodContainersCgroupValues(patchedPod, tc.expected, true) verifyPodContainersCgroupValues(patchedPod, tc.expected, true)
}
ginkgo.By("verifying pod resources after patch") ginkgo.By("verifying pod resources after patch")
verifyPodResources(patchedPod, tc.expected) verifyPodResources(patchedPod, tc.expected)
@ -1434,12 +1461,178 @@ func doPodResizeErrorTests() {
verifyPodAllocations(patchedPod, tc.expected, true) verifyPodAllocations(patchedPod, tc.expected, true)
ginkgo.By("deleting pod") ginkgo.By("deleting pod")
err = e2epod.DeletePodWithWait(f.ClientSet, newPod) err = e2epod.DeletePodWithWait(ctx, f.ClientSet, newPod)
framework.ExpectNoError(err, "failed to delete pod") framework.ExpectNoError(err, "failed to delete pod")
}) })
} }
} }
func doPodResizeSchedulerTests() {
f := framework.NewDefaultFramework("pod-resize-scheduler")
var podClient *e2epod.PodClient
ginkgo.BeforeEach(func() {
podClient = e2epod.NewPodClient(f)
})
ginkgo.It("pod-resize-scheduler-tests", func(ctx context.Context) {
nodes, err := e2enode.GetReadySchedulableNodes(ctx, f.ClientSet)
framework.ExpectNoError(err, "failed to get running nodes")
gomega.Expect(len(nodes.Items) > 0)
framework.Logf("Found %d schedulable nodes", len(nodes.Items))
//
// Calculate available CPU. nodeAvailableCPU = nodeAllocatableCPU - sum(podAllocatedCPU)
//
getNodeAllocatableAndAvailableMilliCPUValues := func(n *v1.Node) (int64, int64) {
nodeAllocatableMilliCPU := n.Status.Allocatable.Cpu().MilliValue()
gomega.Expect(n.Status.Allocatable != nil)
podAllocatedMilliCPU := int64(0)
listOptions := metav1.ListOptions{FieldSelector: "spec.nodeName=" + n.Name}
podList, err := f.ClientSet.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), listOptions)
framework.ExpectNoError(err, "failed to get running pods")
framework.Logf("Found %d pods on node '%s'", len(podList.Items), n.Name)
for _, pod := range podList.Items {
podRequestMilliCPU := resourceapi.GetResourceRequest(&pod, v1.ResourceCPU)
podAllocatedMilliCPU += podRequestMilliCPU
}
nodeAvailableMilliCPU := nodeAllocatableMilliCPU - podAllocatedMilliCPU
return nodeAllocatableMilliCPU, nodeAvailableMilliCPU
}
ginkgo.By("Find node CPU resources available for allocation!")
node := nodes.Items[0]
nodeAllocatableMilliCPU, nodeAvailableMilliCPU := getNodeAllocatableAndAvailableMilliCPUValues(&node)
framework.Logf("Node '%s': NodeAllocatable MilliCPUs = %dm. MilliCPUs currently available to allocate = %dm.",
node.Name, nodeAllocatableMilliCPU, nodeAvailableMilliCPU)
//
// Scheduler focussed pod resize E2E test case #1:
// 1. Create pod1 and pod2 on node such that pod1 has enough CPU to be scheduled, but pod2 does not.
// 2. Resize pod2 down so that it fits on the node and can be scheduled.
// 3. Verify that pod2 gets scheduled and comes up and running.
//
testPod1CPUQuantity := resource.NewMilliQuantity(nodeAvailableMilliCPU/2, resource.DecimalSI)
testPod2CPUQuantity := resource.NewMilliQuantity(nodeAvailableMilliCPU, resource.DecimalSI)
testPod2CPUQuantityResized := resource.NewMilliQuantity(testPod1CPUQuantity.MilliValue()/2, resource.DecimalSI)
framework.Logf("TEST1: testPod1 initial CPU request is '%dm'", testPod1CPUQuantity.MilliValue())
framework.Logf("TEST1: testPod2 initial CPU request is '%dm'", testPod2CPUQuantity.MilliValue())
framework.Logf("TEST1: testPod2 resized CPU request is '%dm'", testPod2CPUQuantityResized.MilliValue())
c1 := []TestContainerInfo{
{
Name: "c1",
Resources: &ContainerResources{CPUReq: testPod1CPUQuantity.String(), CPULim: testPod1CPUQuantity.String()},
},
}
c2 := []TestContainerInfo{
{
Name: "c2",
Resources: &ContainerResources{CPUReq: testPod2CPUQuantity.String(), CPULim: testPod2CPUQuantity.String()},
},
}
patchTestpod2ToFitNode := fmt.Sprintf(`{
"spec": {
"containers": [
{
"name": "c2",
"resources": {"requests": {"cpu": "%dm"}, "limits": {"cpu": "%dm"}}
}
]
}
}`, testPod2CPUQuantityResized.MilliValue(), testPod2CPUQuantityResized.MilliValue())
tStamp := strconv.Itoa(time.Now().Nanosecond())
initDefaultResizePolicy(c1)
initDefaultResizePolicy(c2)
testPod1 := makeTestPod(f.Namespace.Name, "testpod1", tStamp, c1)
testPod2 := makeTestPod(f.Namespace.Name, "testpod2", tStamp, c2)
e2epod.SetNodeAffinity(&testPod1.Spec, node.Name)
e2epod.SetNodeAffinity(&testPod2.Spec, node.Name)
ginkgo.By(fmt.Sprintf("TEST1: Create pod '%s' that fits the node '%s'", testPod1.Name, node.Name))
testPod1 = podClient.CreateSync(ctx, testPod1)
framework.ExpectEqual(testPod1.Status.Phase, v1.PodRunning)
ginkgo.By(fmt.Sprintf("TEST1: Create pod '%s' that won't fit node '%s' with pod '%s' on it", testPod2.Name, node.Name, testPod1.Name))
testPod2 = podClient.Create(ctx, testPod2)
err = e2epod.WaitForPodNameUnschedulableInNamespace(ctx, f.ClientSet, testPod2.Name, testPod2.Namespace)
framework.ExpectNoError(err)
framework.ExpectEqual(testPod2.Status.Phase, v1.PodPending)
ginkgo.By(fmt.Sprintf("TEST1: Resize pod '%s' to fit in node '%s'", testPod2.Name, node.Name))
testPod2, pErr := f.ClientSet.CoreV1().Pods(testPod2.Namespace).Patch(context.TODO(),
testPod2.Name, types.StrategicMergePatchType, []byte(patchTestpod2ToFitNode), metav1.PatchOptions{})
framework.ExpectNoError(pErr, "failed to patch pod for resize")
ginkgo.By(fmt.Sprintf("TEST1: Verify that pod '%s' is running after resize", testPod2.Name))
framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, testPod2))
//
// Scheduler focussed pod resize E2E test case #2
// 1. With pod1 + pod2 running on node above, create pod3 that requests more CPU than available, verify pending.
// 2. Resize pod1 down so that pod3 gets room to be scheduled.
// 3. Verify that pod3 is scheduled and running.
//
nodeAllocatableMilliCPU2, nodeAvailableMilliCPU2 := getNodeAllocatableAndAvailableMilliCPUValues(&node)
framework.Logf("TEST2: Node '%s': NodeAllocatable MilliCPUs = %dm. MilliCPUs currently available to allocate = %dm.",
node.Name, nodeAllocatableMilliCPU2, nodeAvailableMilliCPU2)
testPod3CPUQuantity := resource.NewMilliQuantity(nodeAvailableMilliCPU2+testPod1CPUQuantity.MilliValue()/2, resource.DecimalSI)
testPod1CPUQuantityResized := resource.NewMilliQuantity(testPod1CPUQuantity.MilliValue()/3, resource.DecimalSI)
framework.Logf("TEST2: testPod1 MilliCPUs after resize '%dm'", testPod1CPUQuantityResized.MilliValue())
c3 := []TestContainerInfo{
{
Name: "c3",
Resources: &ContainerResources{CPUReq: testPod3CPUQuantity.String(), CPULim: testPod3CPUQuantity.String()},
},
}
patchTestpod1ToMakeSpaceForPod3 := fmt.Sprintf(`{
"spec": {
"containers": [
{
"name": "c1",
"resources": {"requests": {"cpu": "%dm"},"limits": {"cpu": "%dm"}}
}
]
}
}`, testPod1CPUQuantityResized.MilliValue(), testPod1CPUQuantityResized.MilliValue())
tStamp = strconv.Itoa(time.Now().Nanosecond())
initDefaultResizePolicy(c3)
testPod3 := makeTestPod(f.Namespace.Name, "testpod3", tStamp, c3)
e2epod.SetNodeAffinity(&testPod3.Spec, node.Name)
ginkgo.By(fmt.Sprintf("TEST2: Create testPod3 '%s' that cannot fit node '%s' due to insufficient CPU.", testPod3.Name, node.Name))
testPod3 = podClient.Create(ctx, testPod3)
p3Err := e2epod.WaitForPodNameUnschedulableInNamespace(ctx, f.ClientSet, testPod3.Name, testPod3.Namespace)
framework.ExpectNoError(p3Err, "failed to create pod3 or pod3 did not become pending!")
framework.ExpectEqual(testPod3.Status.Phase, v1.PodPending)
ginkgo.By(fmt.Sprintf("TEST2: Resize pod '%s' to make enough space for pod '%s'", testPod1.Name, testPod3.Name))
testPod1, p1Err := f.ClientSet.CoreV1().Pods(testPod1.Namespace).Patch(context.TODO(),
testPod1.Name, types.StrategicMergePatchType, []byte(patchTestpod1ToMakeSpaceForPod3), metav1.PatchOptions{})
framework.ExpectNoError(p1Err, "failed to patch pod for resize")
ginkgo.By(fmt.Sprintf("TEST2: Verify pod '%s' is running after successfully resizing pod '%s'", testPod3.Name, testPod1.Name))
framework.Logf("TEST2: Pod '%s' CPU requests '%dm'", testPod1.Name, testPod1.Spec.Containers[0].Resources.Requests.Cpu().MilliValue())
framework.Logf("TEST2: Pod '%s' CPU requests '%dm'", testPod2.Name, testPod2.Spec.Containers[0].Resources.Requests.Cpu().MilliValue())
framework.Logf("TEST2: Pod '%s' CPU requests '%dm'", testPod3.Name, testPod3.Spec.Containers[0].Resources.Requests.Cpu().MilliValue())
framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, testPod3))
ginkgo.By("deleting pods")
delErr1 := e2epod.DeletePodWithWait(ctx, f.ClientSet, testPod1)
framework.ExpectNoError(delErr1, "failed to delete pod %s", testPod1.Name)
delErr2 := e2epod.DeletePodWithWait(ctx, f.ClientSet, testPod2)
framework.ExpectNoError(delErr2, "failed to delete pod %s", testPod2.Name)
delErr3 := e2epod.DeletePodWithWait(ctx, f.ClientSet, testPod3)
framework.ExpectNoError(delErr3, "failed to delete pod %s", testPod3.Name)
})
}
var _ = SIGDescribe("[Serial] Pod InPlace Resize Container (scheduler-focussed) [Feature:InPlacePodVerticalScaling]", func() {
doPodResizeSchedulerTests()
})
var _ = SIGDescribe("Pod InPlace Resize Container [Feature:InPlacePodVerticalScaling]", func() { var _ = SIGDescribe("Pod InPlace Resize Container [Feature:InPlacePodVerticalScaling]", func() {
doPodResizeTests() doPodResizeTests()
doPodResizeResourceQuotaTests() doPodResizeResourceQuotaTests()