From 7db339dba22b2427059334aa48fb3b4f356e823a Mon Sep 17 00:00:00 2001 From: Chen Wang Date: Fri, 4 Nov 2022 13:53:49 -0700 Subject: [PATCH] This commit contains the following: 1. Scheduler bug-fix + scheduler-focussed E2E tests 2. Add cgroup v2 support for in-place pod resize 3. Enable full E2E pod resize test for containerd>=1.6.9 and EventedPLEG related changes. Co-Authored-By: Vinay Kulkarni --- pkg/kubelet/cm/cgroup_manager_linux.go | 217 ++++++++---- pkg/kubelet/cm/cgroup_manager_linux_test.go | 82 +++++ pkg/kubelet/cm/cgroup_manager_unsupported.go | 20 +- pkg/kubelet/cm/container_manager_stub.go | 14 +- pkg/kubelet/cm/cpumanager/policy_static.go | 4 + pkg/kubelet/cm/fake_pod_container_manager.go | 24 +- pkg/kubelet/cm/memorymanager/policy_static.go | 4 + pkg/kubelet/cm/pod_container_manager_linux.go | 32 +- pkg/kubelet/cm/types.go | 28 +- pkg/kubelet/container/runtime.go | 6 +- pkg/kubelet/eviction/helpers_test.go | 9 +- pkg/kubelet/kubelet.go | 6 +- .../kuberuntime/kuberuntime_container.go | 3 +- .../kuberuntime_container_linux_test.go | 3 +- .../kuberuntime/kuberuntime_container_test.go | 3 +- .../kuberuntime/kuberuntime_manager.go | 28 +- .../kuberuntime/kuberuntime_manager_test.go | 14 +- pkg/kubelet/pleg/evented.go | 4 + pkg/kubelet/pleg/generic.go | 9 +- pkg/kubelet/pleg/pleg.go | 2 +- pkg/kubelet/prober/scale_test.go | 9 +- .../internal/queue/scheduling_queue.go | 45 ++- test/e2e/node/pod_resize.go | 317 ++++++++++++++---- 23 files changed, 636 insertions(+), 247 deletions(-) diff --git a/pkg/kubelet/cm/cgroup_manager_linux.go b/pkg/kubelet/cm/cgroup_manager_linux.go index 388e9e1c232..c4be02a45b2 100644 --- a/pkg/kubelet/cm/cgroup_manager_linux.go +++ b/pkg/kubelet/cm/cgroup_manager_linux.go @@ -32,6 +32,7 @@ import ( "github.com/opencontainers/runc/libcontainer/cgroups/manager" cgroupsystemd "github.com/opencontainers/runc/libcontainer/cgroups/systemd" libcontainerconfigs "github.com/opencontainers/runc/libcontainer/configs" + v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" @@ -47,7 +48,8 @@ const ( // MemoryMin is memory.min for cgroup v2 MemoryMin string = "memory.min" // MemoryHigh is memory.high for cgroup v2 - MemoryHigh string = "memory.high" + MemoryHigh string = "memory.high" + Cgroup2MaxCpuLimit string = "max" ) var RootCgroupName = CgroupName([]string{}) @@ -559,85 +561,188 @@ func (m *cgroupManagerImpl) MemoryUsage(name CgroupName) (int64, error) { return int64(val), err } -// Get the memory limit in bytes applied to the cgroup -func (m *cgroupManagerImpl) GetCgroupMemoryConfig(name CgroupName) (uint64, error) { - cgroupPaths := m.buildCgroupPaths(name) - cgroupMemoryPath, found := cgroupPaths["memory"] - if !found { - return 0, fmt.Errorf("failed to build memory cgroup fs path for cgroup %v", name) - } - memLimit, err := fscommon.GetCgroupParamUint(cgroupMemoryPath, "memory.limit_in_bytes") - if err != nil { - return 0, fmt.Errorf("failed to get memory.limit_in_bytes for cgroup %v: %v", name, err) - } - return memLimit, nil +// Convert cgroup v1 cpu.shares value to cgroup v2 cpu.weight +// https://github.com/kubernetes/enhancements/tree/master/keps/sig-node/2254-cgroup-v2#phase-1-convert-from-cgroups-v1-settings-to-v2 +func CpuSharesToCpuWeight(cpuShares uint64) uint64 { + return uint64((((cpuShares - 2) * 9999) / 262142) + 1) } -// Get the cpu quota, cpu period, and cpu shares applied to the cgroup -func (m *cgroupManagerImpl) GetCgroupCpuConfig(name CgroupName) (int64, uint64, uint64, error) { - cgroupPaths := m.buildCgroupPaths(name) - cgroupCpuPath, found := cgroupPaths["cpu"] - if !found { - return 0, 0, 0, fmt.Errorf("failed to build CPU cgroup fs path for cgroup %v", name) - } - cpuQuotaStr, errQ := fscommon.GetCgroupParamString(cgroupCpuPath, "cpu.cfs_quota_us") +// Convert cgroup v2 cpu.weight value to cgroup v1 cpu.shares +// https://github.com/kubernetes/enhancements/tree/master/keps/sig-node/2254-cgroup-v2#phase-1-convert-from-cgroups-v1-settings-to-v2 +func CpuWeightToCpuShares(cpuWeight uint64) uint64 { + return uint64((((cpuWeight - 1) * 262142) / 9999) + 2) +} + +func getCgroupv1CpuConfig(cgroupPath string) (*ResourceConfig, error) { + cpuQuotaStr, errQ := fscommon.GetCgroupParamString(cgroupPath, "cpu.cfs_quota_us") if errQ != nil { - return 0, 0, 0, fmt.Errorf("failed to read CPU quota for cgroup %v: %v", name, errQ) + return nil, fmt.Errorf("failed to read CPU quota for cgroup %v: %v", cgroupPath, errQ) } cpuQuota, errInt := strconv.ParseInt(cpuQuotaStr, 10, 64) if errInt != nil { - return 0, 0, 0, fmt.Errorf("failed to convert CPU quota as integer for cgroup %v: %v", name, errInt) + return nil, fmt.Errorf("failed to convert CPU quota as integer for cgroup %v: %v", cgroupPath, errInt) } - cpuPeriod, errP := fscommon.GetCgroupParamUint(cgroupCpuPath, "cpu.cfs_period_us") + cpuPeriod, errP := fscommon.GetCgroupParamUint(cgroupPath, "cpu.cfs_period_us") if errP != nil { - return 0, 0, 0, fmt.Errorf("failed to read CPU period for cgroup %v: %v", name, errP) + return nil, fmt.Errorf("failed to read CPU period for cgroup %v: %v", cgroupPath, errP) } - cpuShares, errS := fscommon.GetCgroupParamUint(cgroupCpuPath, "cpu.shares") - if errP != nil { - return 0, 0, 0, fmt.Errorf("failed to read CPU shares for cgroup %v: %v", name, errS) + cpuShares, errS := fscommon.GetCgroupParamUint(cgroupPath, "cpu.shares") + if errS != nil { + return nil, fmt.Errorf("failed to read CPU shares for cgroup %v: %v", cgroupPath, errS) } - return cpuQuota, cpuPeriod, cpuShares, nil + return &ResourceConfig{CPUShares: &cpuShares, CPUQuota: &cpuQuota, CPUPeriod: &cpuPeriod}, nil } -// Set the memory limit in bytes applied to the cgroup -func (m *cgroupManagerImpl) SetCgroupMemoryConfig(name CgroupName, memoryLimit int64) error { +func getCgroupv2CpuConfig(cgroupPath string) (*ResourceConfig, error) { + var cpuLimitStr, cpuPeriodStr string + cpuLimitAndPeriod, err := fscommon.GetCgroupParamString(cgroupPath, "cpu.max") + if err != nil { + return nil, fmt.Errorf("failed to read cpu.max file for cgroup %v: %v", cgroupPath, err) + } + numItems, errScan := fmt.Sscanf(cpuLimitAndPeriod, "%s %s", &cpuLimitStr, &cpuPeriodStr) + if errScan != nil || numItems != 2 { + return nil, fmt.Errorf("failed to correctly parse content of cpu.max file ('%s') for cgroup %v: %v", + cpuLimitAndPeriod, cgroupPath, errScan) + } + cpuLimit := int64(-1) + if cpuLimitStr != Cgroup2MaxCpuLimit { + cpuLimit, err = strconv.ParseInt(cpuLimitStr, 10, 64) + if err != nil { + return nil, fmt.Errorf("failed to convert CPU limit as integer for cgroup %v: %v", cgroupPath, err) + } + } + cpuPeriod, errPeriod := strconv.ParseUint(cpuPeriodStr, 10, 64) + if errPeriod != nil { + return nil, fmt.Errorf("failed to convert CPU period as integer for cgroup %v: %v", cgroupPath, errPeriod) + } + cpuWeight, errWeight := fscommon.GetCgroupParamUint(cgroupPath, "cpu.weight") + if errWeight != nil { + return nil, fmt.Errorf("failed to read CPU weight for cgroup %v: %v", cgroupPath, errWeight) + } + cpuShares := CpuWeightToCpuShares(cpuWeight) + return &ResourceConfig{CPUShares: &cpuShares, CPUQuota: &cpuLimit, CPUPeriod: &cpuPeriod}, nil +} + +func getCgroupCpuConfig(cgroupPath string) (*ResourceConfig, error) { + if libcontainercgroups.IsCgroup2UnifiedMode() { + return getCgroupv2CpuConfig(cgroupPath) + } else { + return getCgroupv1CpuConfig(cgroupPath) + } +} + +func getCgroupMemoryConfig(cgroupPath string) (*ResourceConfig, error) { + memLimitFile := "memory.limit_in_bytes" + if libcontainercgroups.IsCgroup2UnifiedMode() { + memLimitFile = "memory.max" + } + memLimit, err := fscommon.GetCgroupParamUint(cgroupPath, memLimitFile) + if err != nil { + return nil, fmt.Errorf("failed to read %s for cgroup %v: %v", memLimitFile, cgroupPath, err) + } + mLim := int64(memLimit) + //TODO(vinaykul,InPlacePodVerticalScaling): Add memory request support + return &ResourceConfig{Memory: &mLim}, nil + +} + +// Get the resource config values applied to the cgroup for specified resource type +func (m *cgroupManagerImpl) GetCgroupConfig(name CgroupName, resource v1.ResourceName) (*ResourceConfig, error) { cgroupPaths := m.buildCgroupPaths(name) - cgroupMemoryPath, found := cgroupPaths["memory"] + cgroupResourcePath, found := cgroupPaths[string(resource)] if !found { - return fmt.Errorf("failed to build memory cgroup fs path for cgroup %v", name) + return nil, fmt.Errorf("failed to build %v cgroup fs path for cgroup %v", resource, name) } - memLimit := strconv.FormatInt(memoryLimit, 10) - if err := os.WriteFile(filepath.Join(cgroupMemoryPath, "memory.limit_in_bytes"), []byte(memLimit), 0700); err != nil { - return fmt.Errorf("failed to write %v to %v: %v", memLimit, cgroupMemoryPath, err) + switch resource { + case v1.ResourceCPU: + return getCgroupCpuConfig(cgroupResourcePath) + case v1.ResourceMemory: + return getCgroupMemoryConfig(cgroupResourcePath) } - return nil + return nil, fmt.Errorf("unsupported resource %v for cgroup %v", resource, name) } -// Set the cpu quota, cpu period, and cpu shares applied to the cgroup -func (m *cgroupManagerImpl) SetCgroupCpuConfig(name CgroupName, cpuQuota *int64, cpuPeriod, cpuShares *uint64) error { +func setCgroupv1CpuConfig(cgroupPath string, resourceConfig *ResourceConfig) error { var cpuQuotaStr, cpuPeriodStr, cpuSharesStr string - cgroupPaths := m.buildCgroupPaths(name) - cgroupCpuPath, found := cgroupPaths["cpu"] - if !found { - return fmt.Errorf("failed to build cpu cgroup fs path for cgroup %v", name) - } - if cpuQuota != nil { - cpuQuotaStr = strconv.FormatInt(*cpuQuota, 10) - if err := os.WriteFile(filepath.Join(cgroupCpuPath, "cpu.cfs_quota_us"), []byte(cpuQuotaStr), 0700); err != nil { - return fmt.Errorf("failed to write %v to %v: %v", cpuQuotaStr, cgroupCpuPath, err) + if resourceConfig.CPUQuota != nil { + cpuQuotaStr = strconv.FormatInt(*resourceConfig.CPUQuota, 10) + if err := os.WriteFile(filepath.Join(cgroupPath, "cpu.cfs_quota_us"), []byte(cpuQuotaStr), 0700); err != nil { + return fmt.Errorf("failed to write %v to %v: %v", cpuQuotaStr, cgroupPath, err) } } - if cpuPeriod != nil { - cpuPeriodStr = strconv.FormatUint(*cpuPeriod, 10) - if err := os.WriteFile(filepath.Join(cgroupCpuPath, "cpu.cfs_period_us"), []byte(cpuPeriodStr), 0700); err != nil { - return fmt.Errorf("failed to write %v to %v: %v", cpuPeriodStr, cgroupCpuPath, err) + if resourceConfig.CPUPeriod != nil { + cpuPeriodStr = strconv.FormatUint(*resourceConfig.CPUPeriod, 10) + if err := os.WriteFile(filepath.Join(cgroupPath, "cpu.cfs_period_us"), []byte(cpuPeriodStr), 0700); err != nil { + return fmt.Errorf("failed to write %v to %v: %v", cpuPeriodStr, cgroupPath, err) } } - if cpuShares != nil { - cpuSharesStr = strconv.FormatUint(*cpuShares, 10) - if err := os.WriteFile(filepath.Join(cgroupCpuPath, "cpu.shares"), []byte(cpuSharesStr), 0700); err != nil { - return fmt.Errorf("failed to write %v to %v: %v", cpuSharesStr, cgroupCpuPath, err) + if resourceConfig.CPUShares != nil { + cpuSharesStr = strconv.FormatUint(*resourceConfig.CPUShares, 10) + if err := os.WriteFile(filepath.Join(cgroupPath, "cpu.shares"), []byte(cpuSharesStr), 0700); err != nil { + return fmt.Errorf("failed to write %v to %v: %v", cpuSharesStr, cgroupPath, err) } } return nil } + +func setCgroupv2CpuConfig(cgroupPath string, resourceConfig *ResourceConfig) error { + if resourceConfig.CPUQuota != nil { + if resourceConfig.CPUPeriod == nil { + return fmt.Errorf("CpuPeriod must be specified in order to set CpuLimit") + } + cpuLimitStr := Cgroup2MaxCpuLimit + if *resourceConfig.CPUQuota > -1 { + cpuLimitStr = strconv.FormatInt(*resourceConfig.CPUQuota, 10) + } + cpuPeriodStr := strconv.FormatUint(*resourceConfig.CPUPeriod, 10) + cpuMaxStr := fmt.Sprintf("%s %s", cpuLimitStr, cpuPeriodStr) + if err := os.WriteFile(filepath.Join(cgroupPath, "cpu.max"), []byte(cpuMaxStr), 0700); err != nil { + return fmt.Errorf("failed to write %v to %v: %v", cpuMaxStr, cgroupPath, err) + } + } + if resourceConfig.CPUShares != nil { + cpuWeight := CpuSharesToCpuWeight(*resourceConfig.CPUShares) + cpuWeightStr := strconv.FormatUint(cpuWeight, 10) + if err := os.WriteFile(filepath.Join(cgroupPath, "cpu.weight"), []byte(cpuWeightStr), 0700); err != nil { + return fmt.Errorf("failed to write %v to %v: %v", cpuWeightStr, cgroupPath, err) + } + } + return nil +} + +func setCgroupCpuConfig(cgroupPath string, resourceConfig *ResourceConfig) error { + if libcontainercgroups.IsCgroup2UnifiedMode() { + return setCgroupv2CpuConfig(cgroupPath, resourceConfig) + } else { + return setCgroupv1CpuConfig(cgroupPath, resourceConfig) + } +} + +func setCgroupMemoryConfig(cgroupPath string, resourceConfig *ResourceConfig) error { + memLimitFile := "memory.limit_in_bytes" + if libcontainercgroups.IsCgroup2UnifiedMode() { + memLimitFile = "memory.max" + } + memLimit := strconv.FormatInt(*resourceConfig.Memory, 10) + if err := os.WriteFile(filepath.Join(cgroupPath, memLimitFile), []byte(memLimit), 0700); err != nil { + return fmt.Errorf("failed to write %v to %v/%v: %v", memLimit, cgroupPath, memLimitFile, err) + } + //TODO(vinaykul,InPlacePodVerticalScaling): Add memory request support + return nil +} + +// Set resource config for the specified resource type on the cgroup +func (m *cgroupManagerImpl) SetCgroupConfig(name CgroupName, resource v1.ResourceName, resourceConfig *ResourceConfig) error { + cgroupPaths := m.buildCgroupPaths(name) + cgroupResourcePath, found := cgroupPaths[string(resource)] + if !found { + return fmt.Errorf("failed to build %v cgroup fs path for cgroup %v", resource, name) + } + switch resource { + case v1.ResourceCPU: + return setCgroupCpuConfig(cgroupResourcePath, resourceConfig) + case v1.ResourceMemory: + return setCgroupMemoryConfig(cgroupResourcePath, resourceConfig) + } + return nil +} diff --git a/pkg/kubelet/cm/cgroup_manager_linux_test.go b/pkg/kubelet/cm/cgroup_manager_linux_test.go index 4efeb97cf49..0e28eb7cca8 100644 --- a/pkg/kubelet/cm/cgroup_manager_linux_test.go +++ b/pkg/kubelet/cm/cgroup_manager_linux_test.go @@ -169,3 +169,85 @@ func TestParseSystemdToCgroupName(t *testing.T) { } } } + +func TestCpuSharesToCpuWeight(t *testing.T) { + testCases := []struct { + cpuShares uint64 + expectedCpuWeight uint64 + }{ + { + cpuShares: 2, + expectedCpuWeight: 1, + }, + { + cpuShares: 3, + expectedCpuWeight: 1, + }, + { + cpuShares: 4, + expectedCpuWeight: 1, + }, + { + cpuShares: 28, + expectedCpuWeight: 1, + }, + { + cpuShares: 29, + expectedCpuWeight: 2, + }, + { + cpuShares: 245, + expectedCpuWeight: 10, + }, + { + cpuShares: 262144, + expectedCpuWeight: 10000, + }, + } + + for _, testCase := range testCases { + if actual := CpuSharesToCpuWeight(testCase.cpuShares); actual != testCase.expectedCpuWeight { + t.Errorf("cpuShares: %v, expectedCpuWeight: %v, actualCpuWeight: %v", + testCase.cpuShares, testCase.expectedCpuWeight, actual) + } + } +} + +func TestCpuWeightToCpuShares(t *testing.T) { + testCases := []struct { + cpuWeight uint64 + expectedCpuShares uint64 + }{ + { + cpuWeight: 1, + expectedCpuShares: 2, + }, + { + cpuWeight: 2, + expectedCpuShares: 28, + }, + { + cpuWeight: 3, + expectedCpuShares: 54, + }, + { + cpuWeight: 4, + expectedCpuShares: 80, + }, + { + cpuWeight: 245, + expectedCpuShares: 6398, + }, + { + cpuWeight: 10000, + expectedCpuShares: 262144, + }, + } + + for _, testCase := range testCases { + if actual := CpuWeightToCpuShares(testCase.cpuWeight); actual != testCase.expectedCpuShares { + t.Errorf("cpuWeight: %v, expectedCpuShares: %v, actualCpuShares: %v", + testCase.cpuWeight, testCase.expectedCpuShares, actual) + } + } +} diff --git a/pkg/kubelet/cm/cgroup_manager_unsupported.go b/pkg/kubelet/cm/cgroup_manager_unsupported.go index 6220f5c96e5..976a6c48c42 100644 --- a/pkg/kubelet/cm/cgroup_manager_unsupported.go +++ b/pkg/kubelet/cm/cgroup_manager_unsupported.go @@ -19,7 +19,11 @@ limitations under the License. package cm -import "errors" +import ( + "errors" + + v1 "k8s.io/api/core/v1" +) type unsupportedCgroupManager struct{} @@ -77,19 +81,11 @@ func (m *unsupportedCgroupManager) ReduceCPULimits(cgroupName CgroupName) error return nil } -func (m *unsupportedCgroupManager) GetCgroupMemoryConfig(name CgroupName) (uint64, error) { - return 0, errNotSupported +func (m *unsupportedCgroupManager) GetCgroupConfig(name CgroupName, resource v1.ResourceName) (*ResourceConfig, error) { + return nil, errNotSupported } -func (m *unsupportedCgroupManager) GetCgroupCpuConfig(name CgroupName) (int64, uint64, uint64, error) { - return 0, 0, 0, errNotSupported -} - -func (m *unsupportedCgroupManager) SetCgroupMemoryConfig(name CgroupName, memoryLimit int64) error { - return errNotSupported -} - -func (m *unsupportedCgroupManager) SetCgroupCpuConfig(name CgroupName, cpuQuota *int64, cpuPeriod, cpuShares *uint64) error { +func (m *unsupportedCgroupManager) SetCgroupConfig(name CgroupName, resource v1.ResourceName, resourceConfig *ResourceConfig) error { return errNotSupported } diff --git a/pkg/kubelet/cm/container_manager_stub.go b/pkg/kubelet/cm/container_manager_stub.go index 4555fcbd651..56176e22d9c 100644 --- a/pkg/kubelet/cm/container_manager_stub.go +++ b/pkg/kubelet/cm/container_manager_stub.go @@ -95,19 +95,11 @@ func (cm *containerManagerStub) GetDevicePluginResourceCapacity() (v1.ResourceLi return cm.extendedPluginResources, cm.extendedPluginResources, []string{} } -func (m *podContainerManagerStub) GetPodCgroupMemoryConfig(_ *v1.Pod) (uint64, error) { - return 0, nil +func (m *podContainerManagerStub) GetPodCgroupConfig(_ *v1.Pod, _ v1.ResourceName) (*ResourceConfig, error) { + return nil, nil } -func (m *podContainerManagerStub) GetPodCgroupCpuConfig(_ *v1.Pod) (int64, uint64, uint64, error) { - return 0, 0, 0, nil -} - -func (m *podContainerManagerStub) SetPodCgroupMemoryConfig(_ *v1.Pod, _ int64) error { - return nil -} - -func (m *podContainerManagerStub) SetPodCgroupCpuConfig(_ *v1.Pod, _ *int64, _, _ *uint64) error { +func (m *podContainerManagerStub) SetPodCgroupConfig(_ *v1.Pod, _ v1.ResourceName, _ *ResourceConfig) error { return nil } diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index 69a410b0d0f..a097a8f04e1 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -382,6 +382,10 @@ func (p *staticPolicy) guaranteedCPUs(pod *v1.Pod, container *v1.Container) int return 0 } cpuQuantity := container.Resources.Requests[v1.ResourceCPU] + // In-place pod resize feature makes Container.Resources field mutable for CPU & memory. + // ResourcesAllocated holds the value of Container.Resources.Requests when the pod was admitted. + // We should return this value because this is what kubelet agreed to allocate for the container + // and the value configured with runtime. if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { if cs, ok := podutil.GetContainerStatus(pod.Status.ContainerStatuses, container.Name); ok { cpuQuantity = cs.ResourcesAllocated[v1.ResourceCPU] diff --git a/pkg/kubelet/cm/fake_pod_container_manager.go b/pkg/kubelet/cm/fake_pod_container_manager.go index c77898f5231..b4cce4418fc 100644 --- a/pkg/kubelet/cm/fake_pod_container_manager.go +++ b/pkg/kubelet/cm/fake_pod_container_manager.go @@ -112,30 +112,16 @@ func (cm *FakePodContainerManager) GetPodCgroupMemoryUsage(_ *v1.Pod) (uint64, e return 0, nil } -func (cm *FakePodContainerManager) GetPodCgroupMemoryConfig(_ *v1.Pod) (uint64, error) { +func (cm *FakePodContainerManager) GetPodCgroupConfig(_ *v1.Pod, _ v1.ResourceName) (*ResourceConfig, error) { cm.Lock() defer cm.Unlock() - cm.CalledFunctions = append(cm.CalledFunctions, "GetPodCgroupMemoryConfig") - return 0, nil + cm.CalledFunctions = append(cm.CalledFunctions, "GetPodCgroupConfig") + return nil, nil } -func (cm *FakePodContainerManager) GetPodCgroupCpuConfig(_ *v1.Pod) (int64, uint64, uint64, error) { +func (cm *FakePodContainerManager) SetPodCgroupConfig(_ *v1.Pod, _ v1.ResourceName, _ *ResourceConfig) error { cm.Lock() defer cm.Unlock() - cm.CalledFunctions = append(cm.CalledFunctions, "GetPodCgroupCpuConfig") - return 0, 0, 0, nil -} - -func (cm *FakePodContainerManager) SetPodCgroupMemoryConfig(_ *v1.Pod, _ int64) error { - cm.Lock() - defer cm.Unlock() - cm.CalledFunctions = append(cm.CalledFunctions, "SetPodCgroupMemoryConfig") - return nil -} - -func (cm *FakePodContainerManager) SetPodCgroupCpuConfig(_ *v1.Pod, _ *int64, _, _ *uint64) error { - cm.Lock() - defer cm.Unlock() - cm.CalledFunctions = append(cm.CalledFunctions, "SetPodCgroupCpuConfig") + cm.CalledFunctions = append(cm.CalledFunctions, "SetPodCgroupConfig") return nil } diff --git a/pkg/kubelet/cm/memorymanager/policy_static.go b/pkg/kubelet/cm/memorymanager/policy_static.go index c36436e18b6..1b14e804f84 100644 --- a/pkg/kubelet/cm/memorymanager/policy_static.go +++ b/pkg/kubelet/cm/memorymanager/policy_static.go @@ -414,6 +414,10 @@ func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v func getRequestedResources(pod *v1.Pod, container *v1.Container) (map[v1.ResourceName]uint64, error) { requestedResources := map[v1.ResourceName]uint64{} resources := container.Resources.Requests + // In-place pod resize feature makes Container.Resources field mutable for CPU & memory. + // ResourcesAllocated holds the value of Container.Resources.Requests when the pod was admitted. + // We should return this value because this is what kubelet agreed to allocate for the container + // and the value configured with runtime. if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { if cs, ok := podutil.GetContainerStatus(pod.Status.ContainerStatuses, container.Name); ok { resources = cs.ResourcesAllocated diff --git a/pkg/kubelet/cm/pod_container_manager_linux.go b/pkg/kubelet/cm/pod_container_manager_linux.go index eab6f5d846c..2d4adffd1e2 100644 --- a/pkg/kubelet/cm/pod_container_manager_linux.go +++ b/pkg/kubelet/cm/pod_container_manager_linux.go @@ -129,24 +129,14 @@ func (m *podContainerManagerImpl) GetPodCgroupMemoryUsage(pod *v1.Pod) (uint64, return uint64(memUsage), nil } -func (m *podContainerManagerImpl) GetPodCgroupMemoryConfig(pod *v1.Pod) (uint64, error) { +func (m *podContainerManagerImpl) GetPodCgroupConfig(pod *v1.Pod, resource v1.ResourceName) (*ResourceConfig, error) { podCgroupName, _ := m.GetPodContainerName(pod) - return m.cgroupManager.GetCgroupMemoryConfig(podCgroupName) + return m.cgroupManager.GetCgroupConfig(podCgroupName, resource) } -func (m *podContainerManagerImpl) GetPodCgroupCpuConfig(pod *v1.Pod) (int64, uint64, uint64, error) { +func (m *podContainerManagerImpl) SetPodCgroupConfig(pod *v1.Pod, resource v1.ResourceName, resourceConfig *ResourceConfig) error { podCgroupName, _ := m.GetPodContainerName(pod) - return m.cgroupManager.GetCgroupCpuConfig(podCgroupName) -} - -func (m *podContainerManagerImpl) SetPodCgroupMemoryConfig(pod *v1.Pod, memoryLimit int64) error { - podCgroupName, _ := m.GetPodContainerName(pod) - return m.cgroupManager.SetCgroupMemoryConfig(podCgroupName, memoryLimit) -} - -func (m *podContainerManagerImpl) SetPodCgroupCpuConfig(pod *v1.Pod, cpuQuota *int64, cpuPeriod, cpuShares *uint64) error { - podCgroupName, _ := m.GetPodContainerName(pod) - return m.cgroupManager.SetCgroupCpuConfig(podCgroupName, cpuQuota, cpuPeriod, cpuShares) + return m.cgroupManager.SetCgroupConfig(podCgroupName, resource, resourceConfig) } // Kill one process ID @@ -356,18 +346,10 @@ func (m *podContainerManagerNoop) GetPodCgroupMemoryUsage(_ *v1.Pod) (uint64, er return 0, nil } -func (m *podContainerManagerNoop) GetPodCgroupMemoryConfig(_ *v1.Pod) (uint64, error) { - return 0, nil +func (m *podContainerManagerNoop) GetPodCgroupConfig(_ *v1.Pod, _ v1.ResourceName) (*ResourceConfig, error) { + return nil, nil } -func (m *podContainerManagerNoop) GetPodCgroupCpuConfig(_ *v1.Pod) (int64, uint64, uint64, error) { - return 0, 0, 0, nil -} - -func (m *podContainerManagerNoop) SetPodCgroupMemoryConfig(_ *v1.Pod, _ int64) error { - return nil -} - -func (m *podContainerManagerNoop) SetPodCgroupCpuConfig(_ *v1.Pod, _ *int64, _, _ *uint64) error { +func (m *podContainerManagerNoop) SetPodCgroupConfig(_ *v1.Pod, _ v1.ResourceName, _ *ResourceConfig) error { return nil } diff --git a/pkg/kubelet/cm/types.go b/pkg/kubelet/cm/types.go index 68ea1bef37b..aff4c20d429 100644 --- a/pkg/kubelet/cm/types.go +++ b/pkg/kubelet/cm/types.go @@ -84,14 +84,10 @@ type CgroupManager interface { ReduceCPULimits(cgroupName CgroupName) error // MemoryUsage returns current memory usage of the specified cgroup, as read from the cgroupfs. MemoryUsage(name CgroupName) (int64, error) - // GetCgroupMemoryConfig returns the memory limit of the specified cgroup as read from cgroup fs. - GetCgroupMemoryConfig(name CgroupName) (uint64, error) - // GetCgroupCpuConfig returns the cpu quota, cpu period, and cpu shares of the specified cgroup as read from cgroup fs. - GetCgroupCpuConfig(name CgroupName) (int64, uint64, uint64, error) - // SetCgroupMemoryConfig sets the memory limit of the specified cgroup. - SetCgroupMemoryConfig(name CgroupName, memoryLimit int64) error - // SetCgroupCpuConfig sets the cpu quota, cpu period, and cpu shares of the specified cgroup. - SetCgroupCpuConfig(name CgroupName, cpuQuota *int64, cpuPeriod, cpuShares *uint64) error + // Get the resource config values applied to the cgroup for specified resource type + GetCgroupConfig(name CgroupName, resource v1.ResourceName) (*ResourceConfig, error) + // Set resource config for the specified resource type on the cgroup + SetCgroupConfig(name CgroupName, resource v1.ResourceName, resourceConfig *ResourceConfig) error } // QOSContainersInfo stores the names of containers per qos @@ -128,18 +124,12 @@ type PodContainerManager interface { // IsPodCgroup returns true if the literal cgroupfs name corresponds to a pod IsPodCgroup(cgroupfs string) (bool, types.UID) - // Get value of memory.usage_in_bytes for the pod Cgroup + // Get value of memory usage for the pod Cgroup GetPodCgroupMemoryUsage(pod *v1.Pod) (uint64, error) - // Get value of memory.limit_in_bytes for the pod Cgroup - GetPodCgroupMemoryConfig(pod *v1.Pod) (uint64, error) + // Get the resource config values applied to the pod cgroup for specified resource type + GetPodCgroupConfig(pod *v1.Pod, resource v1.ResourceName) (*ResourceConfig, error) - // Get values of cpu.cfs_quota_us, cpu.cfs_period_us, and cpu.shares for the pod Cgroup - GetPodCgroupCpuConfig(pod *v1.Pod) (int64, uint64, uint64, error) - - // Set value of memory.limit_in_bytes for the pod Cgroup - SetPodCgroupMemoryConfig(pod *v1.Pod, memoryLimit int64) error - - // Set values of cpu.cfs_quota_us, cpu.cfs_period_us, and cpu.shares for the pod Cgroup - SetPodCgroupCpuConfig(pod *v1.Pod, cpuQuota *int64, cpuPeriod, cpuShares *uint64) error + // Set resource config values for the specified resource type on the pod cgroup + SetPodCgroupConfig(pod *v1.Pod, resource v1.ResourceName, resourceConfig *ResourceConfig) error } diff --git a/pkg/kubelet/container/runtime.go b/pkg/kubelet/container/runtime.go index c2076142d06..5f54799c7ed 100644 --- a/pkg/kubelet/container/runtime.go +++ b/pkg/kubelet/container/runtime.go @@ -327,13 +327,13 @@ type PodStatus struct { // ContainerResources represents the Resources allocated to the running container. type ContainerResources struct { - // CPU capacity reserved for the container (cpu.shares) + // CPU capacity reserved for the container CPURequest *resource.Quantity - // CPU limit enforced on the container (cpu.cfs_quota_us) + // CPU limit enforced on the container CPULimit *resource.Quantity // Memory capaacity reserved for the container MemoryRequest *resource.Quantity - // Memory limit enforced on the container (memory.limit_in_bytes) + // Memory limit enforced on the container MemoryLimit *resource.Quantity } diff --git a/pkg/kubelet/eviction/helpers_test.go b/pkg/kubelet/eviction/helpers_test.go index 2efb9b9d91c..9b476afed76 100644 --- a/pkg/kubelet/eviction/helpers_test.go +++ b/pkg/kubelet/eviction/helpers_test.go @@ -31,8 +31,11 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1" + "k8s.io/kubernetes/pkg/features" evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" ) @@ -2153,13 +2156,15 @@ func TestEvictonMessageWithResourceResize(t *testing.T) { result, found := stats[pod] return result, found } + threshold := []evictionapi.Threshold{} + observations := signalObservations{} for _, enabled := range []bool{true, false} { t.Run(fmt.Sprintf("InPlacePodVerticalScaling enabled=%v", enabled), func(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InPlacePodVerticalScaling, enabled)() - msg, _ := evictionMessage(v1.ResourceMemory, testpod, statsFn) + msg, _ := evictionMessage(v1.ResourceMemory, testpod, statsFn, threshold, observations) if enabled { - if !strings.Contains(msg, "testcontainer was using 150Mi, which exceeds its request of 100Mi") { + if !strings.Contains(msg, "testcontainer was using 150Mi, request is 100Mi") { t.Errorf("Expected 'exceeds memory' eviction message was not found.") } } else { diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 31c51c0d3ec..8a08a8c98dd 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1861,7 +1861,7 @@ func (kl *Kubelet) syncPod(_ context.Context, updateType kubetypes.SyncPodType, if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) && isPodResizeInProgress(pod, &apiPodStatus) { // While resize is in progress, periodically call PLEG to update pod cache runningPod := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus) - if err := kl.pleg.UpdateCache(&runningPod, pod.UID); err != nil { + if err, _ := kl.pleg.UpdateCache(&runningPod, pod.UID); err != nil { klog.ErrorS(err, "Failed to update pod cache", "pod", klog.KObj(pod)) return false, err } @@ -2534,7 +2534,7 @@ func (kl *Kubelet) canResizePod(pod *v1.Pod) (bool, *v1.Pod, v1.PodResizeStatus) cpuRequests := resource.GetResourceRequest(pod, v1.ResourceCPU) memRequests := resource.GetResourceRequest(pod, v1.ResourceMemory) if cpuRequests > cpuAvailable || memRequests > memAvailable { - klog.V(3).InfoS("Resize is not feasible as request exceeds allocatable node resources", "Pod", pod.Name) + klog.V(3).InfoS("Resize is not feasible as request exceeds allocatable node resources", "pod", pod.Name) return false, nil, v1.PodResizeStatusInfeasible } @@ -2549,7 +2549,7 @@ func (kl *Kubelet) canResizePod(pod *v1.Pod) (bool, *v1.Pod, v1.PodResizeStatus) if ok, failReason, failMessage := kl.canAdmitPod(otherActivePods, pod); !ok { // Log reason and return. Let the next sync iteration retry the resize - klog.V(3).InfoS("Resize cannot be accommodated", "Pod", pod.Name, "Reason", failReason, "Message", failMessage) + klog.V(3).InfoS("Resize cannot be accommodated", "pod", pod.Name, "reason", failReason, "message", failMessage) return false, nil, v1.PodResizeStatusDeferred } diff --git a/pkg/kubelet/kuberuntime/kuberuntime_container.go b/pkg/kubelet/kuberuntime/kuberuntime_container.go index c0c55d60687..745d70fc7ed 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_container.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_container.go @@ -366,7 +366,8 @@ func (m *kubeGenericRuntimeManager) updateContainerResources(pod *v1.Pod, contai if containerResources == nil { return fmt.Errorf("container %q updateContainerResources failed: cannot generate resources config", containerID.String()) } - err := m.runtimeService.UpdateContainerResources(containerID.ID, containerResources) + ctx := context.Background() + err := m.runtimeService.UpdateContainerResources(ctx, containerID.ID, containerResources) if err != nil { klog.ErrorS(err, "UpdateContainerResources failed", "container", containerID.String()) } diff --git a/pkg/kubelet/kuberuntime/kuberuntime_container_linux_test.go b/pkg/kubelet/kuberuntime/kuberuntime_container_linux_test.go index 3f8c70219ea..2297cffcdf9 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_container_linux_test.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_container_linux_test.go @@ -865,15 +865,16 @@ func TestGenerateLinuxContainerResources(t *testing.T) { if tc.scalingFg { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InPlacePodVerticalScaling, true)() } - tc.expected.HugepageLimits = []*runtimeapi.HugepageLimit{{PageSize: "2MB", Limit: 0}, {PageSize: "1GB", Limit: 0}} pod.Spec.Containers[0].Resources = v1.ResourceRequirements{Limits: tc.limits, Requests: tc.requests} if len(tc.cStatus) > 0 { pod.Status.ContainerStatuses = tc.cStatus } resources := m.generateLinuxContainerResources(pod, &pod.Spec.Containers[0], false) + tc.expected.HugepageLimits = resources.HugepageLimits if diff.ObjectDiff(resources, tc.expected) != "" { t.Errorf("Test %s: expected resources %+v, but got %+v", tc.name, tc.expected, resources) } }) } + //TODO(vinaykul,InPlacePodVerticalScaling): Add unit tests for cgroup v1 & v2 } diff --git a/pkg/kubelet/kuberuntime/kuberuntime_container_test.go b/pkg/kubelet/kuberuntime/kuberuntime_container_test.go index d7d26877cee..b0cda9911da 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_container_test.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_container_test.go @@ -828,7 +828,8 @@ func TestUpdateContainerResources(t *testing.T) { _, fakeContainers := makeAndSetFakePod(t, m, fakeRuntime, pod) assert.Equal(t, len(fakeContainers), 1) - cStatus, err := m.getPodContainerStatuses(pod.UID, pod.Name, pod.Namespace) + ctx := context.Background() + cStatus, err := m.getPodContainerStatuses(ctx, pod.UID, pod.Name, pod.Namespace) assert.NoError(t, err) containerID := cStatus[0].ID diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager.go b/pkg/kubelet/kuberuntime/kuberuntime_manager.go index bf5a77816c3..82eaa67e7a2 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager.go @@ -647,13 +647,15 @@ func (m *kubeGenericRuntimeManager) doPodResizeAction(pod *v1.Pod, podStatus *ku var err error switch rName { case v1.ResourceCPU: + podCpuResources := &cm.ResourceConfig{CPUPeriod: podResources.CPUPeriod} if setLimitValue == true { - err = pcm.SetPodCgroupCpuConfig(pod, podResources.CpuQuota, podResources.CpuPeriod, nil) + podCpuResources.CPUQuota = podResources.CPUQuota } else { - err = pcm.SetPodCgroupCpuConfig(pod, nil, podResources.CpuPeriod, podResources.CpuShares) + podCpuResources.CPUShares = podResources.CPUShares } + err = pcm.SetPodCgroupConfig(pod, rName, podCpuResources) case v1.ResourceMemory: - err = pcm.SetPodCgroupMemoryConfig(pod, *podResources.Memory) + err = pcm.SetPodCgroupConfig(pod, rName, podResources) } if err != nil { klog.ErrorS(err, "Failed to set cgroup config", "resource", rName, "pod", pod.Name) @@ -693,9 +695,9 @@ func (m *kubeGenericRuntimeManager) doPodResizeAction(pod *v1.Pod, podStatus *ku return err } if len(podContainerChanges.ContainersToUpdate[v1.ResourceMemory]) > 0 || podContainerChanges.UpdatePodResources { - currentPodMemoryLimit, err := pcm.GetPodCgroupMemoryConfig(pod) + currentPodMemoryConfig, err := pcm.GetPodCgroupConfig(pod, v1.ResourceMemory) if err != nil { - klog.ErrorS(err, "GetPodCgroupMemoryConfig failed", "pod", pod.Name) + klog.ErrorS(err, "GetPodCgroupConfig for memory failed", "pod", pod.Name) result.Fail(err) return } @@ -710,20 +712,20 @@ func (m *kubeGenericRuntimeManager) doPodResizeAction(pod *v1.Pod, podStatus *ku result.Fail(fmt.Errorf("Aborting attempt to set pod memory limit less than current memory usage for pod %s", pod.Name)) return } - if errResize := resizeContainers(v1.ResourceMemory, int64(currentPodMemoryLimit), *podResources.Memory, 0, 0); errResize != nil { + if errResize := resizeContainers(v1.ResourceMemory, int64(*currentPodMemoryConfig.Memory), *podResources.Memory, 0, 0); errResize != nil { result.Fail(errResize) return } } if len(podContainerChanges.ContainersToUpdate[v1.ResourceCPU]) > 0 || podContainerChanges.UpdatePodResources { - currentPodCpuQuota, _, currentPodCPUShares, err := pcm.GetPodCgroupCpuConfig(pod) + currentPodCpuConfig, err := pcm.GetPodCgroupConfig(pod, v1.ResourceCPU) if err != nil { - klog.ErrorS(err, "GetPodCgroupCpuConfig failed", "pod", pod.Name) + klog.ErrorS(err, "GetPodCgroupConfig for CPU failed", "pod", pod.Name) result.Fail(err) return } - if errResize := resizeContainers(v1.ResourceCPU, currentPodCpuQuota, *podResources.CpuQuota, - int64(currentPodCPUShares), int64(*podResources.CpuShares)); errResize != nil { + if errResize := resizeContainers(v1.ResourceCPU, *currentPodCpuConfig.CPUQuota, *podResources.CPUQuota, + int64(*currentPodCpuConfig.CPUShares), int64(*podResources.CPUShares)); errResize != nil { result.Fail(errResize) return } @@ -780,7 +782,7 @@ func (m *kubeGenericRuntimeManager) updatePodContainerResources(pod *v1.Pod, res } // computePodActions checks whether the pod spec has changed and returns the changes if true. -func (m *kubeGenericRuntimeManager) computePodActions(pod *v1.Pod, podStatus *kubecontainer.PodStatus) podActions { +func (m *kubeGenericRuntimeManager) computePodActions(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) podActions { klog.V(5).InfoS("Syncing Pod", "pod", klog.KObj(pod)) createPodSandbox, attempt, sandboxID := runtimeutil.PodSandboxChanged(pod, podStatus) @@ -873,7 +875,7 @@ func (m *kubeGenericRuntimeManager) computePodActions(pod *v1.Pod, podStatus *ku if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { changes.ContainersToUpdate = make(map[v1.ResourceName][]containerToUpdateInfo) - latestPodStatus, err := m.GetPodStatus(podStatus.ID, pod.Name, pod.Namespace) + latestPodStatus, err := m.GetPodStatus(ctx, podStatus.ID, pod.Name, pod.Namespace) if err == nil { podStatus = latestPodStatus } @@ -982,7 +984,7 @@ func (m *kubeGenericRuntimeManager) computePodActions(pod *v1.Pod, podStatus *ku // 8. Create normal containers. func (m *kubeGenericRuntimeManager) SyncPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) { // Step 1: Compute sandbox and container changes. - podContainerChanges := m.computePodActions(pod, podStatus) + podContainerChanges := m.computePodActions(ctx, pod, podStatus) klog.V(3).InfoS("computePodActions got for pod", "podActions", podContainerChanges, "pod", klog.KObj(pod)) if podContainerChanges.CreateSandbox { ref, err := ref.GetReference(legacyscheme.Scheme, pod) diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go b/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go index ad33fe24121..8a9f1646fc8 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go @@ -1174,7 +1174,8 @@ func TestComputePodActions(t *testing.T) { if test.mutateStatusFn != nil { test.mutateStatusFn(status) } - actions := m.computePodActions(pod, status) + ctx := context.Background() + actions := m.computePodActions(ctx, pod, status) verifyActions(t, &test.actions, &actions, desc) if test.resetStatusFn != nil { test.resetStatusFn(status) @@ -1389,7 +1390,8 @@ func TestComputePodActionsWithInitContainers(t *testing.T) { if test.mutateStatusFn != nil { test.mutateStatusFn(status) } - actions := m.computePodActions(pod, status) + ctx := context.Background() + actions := m.computePodActions(ctx, pod, status) verifyActions(t, &test.actions, &actions, desc) } } @@ -1571,7 +1573,8 @@ func TestComputePodActionsWithInitAndEphemeralContainers(t *testing.T) { if test.mutateStatusFn != nil { test.mutateStatusFn(status) } - actions := m.computePodActions(pod, status) + ctx := context.Background() + actions := m.computePodActions(ctx, pod, status) verifyActions(t, &test.actions, &actions, desc) } } @@ -1964,7 +1967,8 @@ func TestComputePodActionsForPodResize(t *testing.T) { } } makeAndSetFakePod(t, m, fakeRuntime, pod) - status, _ := m.GetPodStatus(kps.ID, pod.Name, pod.Namespace) + ctx := context.Background() + status, _ := m.GetPodStatus(ctx, kps.ID, pod.Name, pod.Namespace) for idx := range pod.Spec.Containers { if rcs := status.FindContainerStatusByName(pod.Spec.Containers[idx].Name); rcs != nil { if csIdx, found := podutil.GetIndexOfContainerStatus(pod.Status.ContainerStatuses, pod.Spec.Containers[idx].Name); found { @@ -1982,7 +1986,7 @@ func TestComputePodActionsForPodResize(t *testing.T) { test.mutatePodFn(pod) } expectedActions := test.getExpectedPodActionsFn(pod, status) - actions := m.computePodActions(pod, status) + actions := m.computePodActions(ctx, pod, status) verifyActions(t, expectedActions, &actions, desc) } } diff --git a/pkg/kubelet/pleg/evented.go b/pkg/kubelet/pleg/evented.go index 9e9a5420899..46ba5c1525c 100644 --- a/pkg/kubelet/pleg/evented.go +++ b/pkg/kubelet/pleg/evented.go @@ -409,3 +409,7 @@ func (e *EventedPLEG) updateRunningContainerMetric(podStatus *kubecontainer.PodS } } } + +func (e *EventedPLEG) UpdateCache(pod *kubecontainer.Pod, pid types.UID) (error, bool) { + return fmt.Errorf("not implemented"), false +} diff --git a/pkg/kubelet/pleg/generic.go b/pkg/kubelet/pleg/generic.go index 877f51c72c4..1ae955a1dee 100644 --- a/pkg/kubelet/pleg/generic.go +++ b/pkg/kubelet/pleg/generic.go @@ -482,14 +482,15 @@ func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, p return err, g.cache.Set(pod.ID, status, err, timestamp) } -func (g *GenericPLEG) UpdateCache(pod *kubecontainer.Pod, pid types.UID) error { +func (g *GenericPLEG) UpdateCache(pod *kubecontainer.Pod, pid types.UID) (error, bool) { + ctx := context.Background() if !g.cacheEnabled() { - return fmt.Errorf("pod cache disabled") + return fmt.Errorf("pod cache disabled"), false } if pod == nil { - return fmt.Errorf("pod cannot be nil") + return fmt.Errorf("pod cannot be nil"), false } - return g.updateCache(pod, pid) + return g.updateCache(ctx, pod, pid) } func updateEvents(eventsByPodID map[types.UID][]*PodLifecycleEvent, e *PodLifecycleEvent) { diff --git a/pkg/kubelet/pleg/pleg.go b/pkg/kubelet/pleg/pleg.go index b0c60b66c9a..2654f32d6fc 100644 --- a/pkg/kubelet/pleg/pleg.go +++ b/pkg/kubelet/pleg/pleg.go @@ -69,5 +69,5 @@ type PodLifecycleEventGenerator interface { Watch() chan *PodLifecycleEvent Healthy() (bool, error) Relist() - UpdateCache(*kubecontainer.Pod, types.UID) error + UpdateCache(*kubecontainer.Pod, types.UID) (error, bool) } diff --git a/pkg/kubelet/prober/scale_test.go b/pkg/kubelet/prober/scale_test.go index 8b38feb1015..9b07ab851ec 100644 --- a/pkg/kubelet/prober/scale_test.go +++ b/pkg/kubelet/prober/scale_test.go @@ -19,6 +19,7 @@ package prober import ( "context" "fmt" + "io/ioutil" "net" "net/http" "sync" @@ -80,10 +81,16 @@ func TestTCPPortExhaustion(t *testing.T) { } for _, tt := range tests { t.Run(fmt.Sprintf(tt.name), func(t *testing.T) { + testRootDir := "" + if tempDir, err := ioutil.TempDir("", "kubelet_test."); err != nil { + t.Fatalf("can't make a temp rootdir: %v", err) + } else { + testRootDir = tempDir + } podManager := kubepod.NewBasicPodManager(nil) podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker() m := NewManager( - status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker), + status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, testRootDir), results.NewManager(), results.NewManager(), results.NewManager(), diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index e265878e6a0..1f639b5c421 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -38,10 +38,12 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" listersv1 "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity" "k8s.io/kubernetes/pkg/scheduler/internal/heap" @@ -683,14 +685,47 @@ func (p *PriorityQueue) AssignedPodAdded(pod *v1.Pod) { p.lock.Unlock() } +// isPodResourcesResizedDown returns true if a pod CPU and/or memory resize request has been +// admitted by kubelet, is 'InProgress', and results in a net sizing down of updated resources. +// It returns false if either CPU or memory resource is net resized up, or if no resize is in progress. +func isPodResourcesResizedDown(pod *v1.Pod) bool { + if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { + // TODO(vinaykul,wangchen615,InPlacePodVerticalScaling): Fix this to determine when a + // pod is truly resized down (might need oldPod if we cannot determine from Status alone) + if pod.Status.Resize == v1.PodResizeStatusInProgress { + return true + } + } + return false +} + // AssignedPodUpdated is called when a bound pod is updated. Change of labels // may make pending pods with matching affinity terms schedulable. func (p *PriorityQueue) AssignedPodUpdated(pod *v1.Pod) { p.lock.Lock() - p.movePodsToActiveOrBackoffQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod), AssignedPodUpdate) + if isPodResourcesResizedDown(pod) { + p.moveAllToActiveOrBackoffQueue(AssignedPodUpdate, nil) + } else { + p.movePodsToActiveOrBackoffQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod), AssignedPodUpdate) + } p.lock.Unlock() } +// NOTE: this function assumes a lock has been acquired in the caller. +// moveAllToActiveOrBackoffQueue moves all pods from unschedulablePods to activeQ or backoffQ. +// This function adds all pods and then signals the condition variable to ensure that +// if Pop() is waiting for an item, it receives the signal after all the pods are in the +// queue and the head is the highest priority pod. +func (p *PriorityQueue) moveAllToActiveOrBackoffQueue(event framework.ClusterEvent, preCheck PreEnqueueCheck) { + unschedulablePods := make([]*framework.QueuedPodInfo, 0, len(p.unschedulablePods.podInfoMap)) + for _, pInfo := range p.unschedulablePods.podInfoMap { + if preCheck == nil || preCheck(pInfo.Pod) { + unschedulablePods = append(unschedulablePods, pInfo) + } + } + p.movePodsToActiveOrBackoffQueue(unschedulablePods, event) +} + // MoveAllToActiveOrBackoffQueue moves all pods from unschedulablePods to activeQ or backoffQ. // This function adds all pods and then signals the condition variable to ensure that // if Pop() is waiting for an item, it receives the signal after all the pods are in the @@ -698,13 +733,7 @@ func (p *PriorityQueue) AssignedPodUpdated(pod *v1.Pod) { func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(event framework.ClusterEvent, preCheck PreEnqueueCheck) { p.lock.Lock() defer p.lock.Unlock() - unschedulablePods := make([]*framework.QueuedPodInfo, 0, len(p.unschedulablePods.podInfoMap)) - for _, pInfo := range p.unschedulablePods.podInfoMap { - if preCheck == nil || preCheck(pInfo.Pod) { - unschedulablePods = append(unschedulablePods, pInfo) - } - } - p.movePodsToActiveOrBackoffQueue(unschedulablePods, event) + p.moveAllToActiveOrBackoffQueue(event, preCheck) } // NOTE: this function assumes lock has been acquired in caller diff --git a/test/e2e/node/pod_resize.go b/test/e2e/node/pod_resize.go index 21c513c0330..9064f332b58 100644 --- a/test/e2e/node/pod_resize.go +++ b/test/e2e/node/pod_resize.go @@ -19,6 +19,7 @@ package node import ( "context" "fmt" + "regexp" "strconv" "strings" "time" @@ -29,16 +30,19 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/diff" - utilfeature "k8s.io/apiserver/pkg/util/feature" - "k8s.io/component-base/featuregate" + clientset "k8s.io/client-go/kubernetes" podutil "k8s.io/kubernetes/pkg/api/v1/pod" - "k8s.io/kubernetes/pkg/features" + resourceapi "k8s.io/kubernetes/pkg/api/v1/resource" kubecm "k8s.io/kubernetes/pkg/kubelet/cm" "k8s.io/kubernetes/test/e2e/framework" + e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl" + e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" + e2epodoutput "k8s.io/kubernetes/test/e2e/framework/pod/output" imageutils "k8s.io/kubernetes/test/utils/image" + semver "github.com/blang/semver/v4" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" ) @@ -52,6 +56,7 @@ const ( Cgroupv2MemRequest string = "/sys/fs/cgroup/memory.min" Cgroupv2CPULimit string = "/sys/fs/cgroup/cpu.max" Cgroupv2CPURequest string = "/sys/fs/cgroup/cpu.weight" + CpuPeriod string = "100000" PollInterval time.Duration = 2 * time.Second PollTimeout time.Duration = 4 * time.Minute @@ -74,13 +79,24 @@ type TestContainerInfo struct { RestartCount int32 } -func isFeatureGatePostAlpha() bool { - if fs, found := utilfeature.DefaultFeatureGate.DeepCopy().GetAll()[features.InPlacePodVerticalScaling]; found { - if fs.PreRelease == featuregate.Alpha { +func isInPlaceResizeSupportedByRuntime(c clientset.Interface, nodeName string) bool { + //TODO(vinaykul,InPlacePodVerticalScaling): Can we optimize this? + node, err := c.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) + if err != nil { + return false + } + re := regexp.MustCompile("containerd://(.*)") + match := re.FindStringSubmatch(node.Status.NodeInfo.ContainerRuntimeVersion) + if len(match) != 2 { + return false + } + if ver, verr := semver.ParseTolerant(match[1]); verr == nil { + if ver.Compare(semver.MustParse("1.6.9")) < 0 { return false } + return true } - return true + return false } func getTestResourceInfo(tcInfo TestContainerInfo) (v1.ResourceRequirements, v1.ResourceList, []v1.ContainerResizePolicy) { @@ -288,8 +304,9 @@ func verifyPodStatusResources(pod *v1.Pod, tcInfo []TestContainerInfo) { func isPodOnCgroupv2Node(pod *v1.Pod) bool { // Determine if pod is running on cgroupv2 or cgroupv1 node + //TODO(vinaykul,InPlacePodVerticalScaling): Is there a better way to determine this? cgroupv2File := "/sys/fs/cgroup/cgroup.controllers" - _, err := framework.RunKubectl(pod.Namespace, "exec", pod.Name, "--", "ls", cgroupv2File) + _, err := e2ekubectl.RunKubectl(pod.Namespace, "exec", pod.Name, "--", "ls", cgroupv2File) if err == nil { return true } @@ -308,9 +325,12 @@ func verifyPodContainersCgroupValues(pod *v1.Pod, tcInfo []TestContainerInfo, fl } verifyCgroupValue := func(cName, cgPath, expectedCgValue string) bool { cmd := []string{"head", "-n", "1", cgPath} - cgValue, err := framework.LookForStringInPodExecToContainer(pod.Namespace, pod.Name, cName, cmd, expectedCgValue, PollTimeout) + framework.Logf("Namespace %s Pod %s Container %s - looking for cgroup value %s in path %s", + pod.Namespace, pod.Name, cName, expectedCgValue, cgPath) + cgValue, err := e2epodoutput.LookForStringInPodExecToContainer(pod.Namespace, pod.Name, cName, cmd, expectedCgValue, PollTimeout) if flagError { - framework.ExpectNoError(err, "failed to find expected cgroup value in container") + framework.ExpectNoError(err, fmt.Sprintf("failed to find expected value '%s' in container cgroup '%s'", + expectedCgValue, cgPath)) } cgValue = strings.Trim(cgValue, "\n") if flagError { @@ -342,8 +362,11 @@ func verifyPodContainersCgroupValues(pod *v1.Pod, tcInfo []TestContainerInfo, fl cpuQuota = -1 } cpuLimitString = strconv.FormatInt(cpuQuota, 10) - if podOnCgroupv2Node && cpuLimitString == "-1" { - cpuLimitString = "max" + if podOnCgroupv2Node { + if cpuLimitString == "-1" { + cpuLimitString = "max" + } + cpuLimitString = fmt.Sprintf("%s %s", cpuLimitString, CpuPeriod) } memLimitString = strconv.FormatInt(memLimitInBytes, 10) if podOnCgroupv2Node && memLimitString == "0" { @@ -357,6 +380,10 @@ func verifyPodContainersCgroupValues(pod *v1.Pod, tcInfo []TestContainerInfo, fl if !verifyCgroupValue(ci.Name, cgroupCPULimit, cpuLimitString) { return false } + if podOnCgroupv2Node { + // convert cgroup v1 cpu.shares value to cgroup v2 cpu.weight value + cpuShares = int64(1 + ((cpuShares-2)*9999)/262142) + } if !verifyCgroupValue(ci.Name, cgroupCPURequest, strconv.FormatInt(cpuShares, 10)) { return false } @@ -365,7 +392,7 @@ func verifyPodContainersCgroupValues(pod *v1.Pod, tcInfo []TestContainerInfo, fl return true } -func waitForPodResizeActuation(podClient *framework.PodClient, pod, patchedPod *v1.Pod, expectedContainers []TestContainerInfo) *v1.Pod { +func waitForPodResizeActuation(c clientset.Interface, podClient *e2epod.PodClient, pod, patchedPod *v1.Pod, expectedContainers []TestContainerInfo) *v1.Pod { waitForContainerRestart := func() error { var restartContainersExpected []string @@ -443,15 +470,12 @@ func waitForPodResizeActuation(podClient *framework.PodClient, pod, patchedPod * // Wait for pod resource allocations to equal expected values after resize resizedPod, aErr := waitPodAllocationsEqualsExpected() framework.ExpectNoError(aErr, "failed to verify pod resource allocation values equals expected values") - //TODO(vinaykul,InPlacePodVerticalScaling): Remove this check when cgroupv2 support is added - if !isPodOnCgroupv2Node(pod) { - // Wait for container cgroup values to equal expected cgroup values after resize - cErr := waitContainerCgroupValuesEqualsExpected() - framework.ExpectNoError(cErr, "failed to verify container cgroup values equals expected values") - } - //TODO(vinaykul,InPlacePodVerticalScaling): Remove featureGatePostAlpha upon exiting Alpha. + // Wait for container cgroup values to equal expected cgroup values after resize + cErr := waitContainerCgroupValuesEqualsExpected() + framework.ExpectNoError(cErr, "failed to verify container cgroup values equals expected values") + //TODO(vinaykul,InPlacePodVerticalScaling): Remove this check once base-OS updates to containerd>=1.6.9 // containerd needs to add CRI support before Beta (See Node KEP #2273) - if isFeatureGatePostAlpha() { + if isInPlaceResizeSupportedByRuntime(c, pod.Spec.NodeName) { // Wait for PodSpec container resources to equal PodStatus container resources indicating resize is complete rPod, rErr := waitPodStatusResourcesEqualSpecResources() framework.ExpectNoError(rErr, "failed to verify pod spec resources equals pod status resources") @@ -464,6 +488,10 @@ func waitForPodResizeActuation(podClient *framework.PodClient, pod, patchedPod * func doPodResizeTests() { f := framework.NewDefaultFramework("pod-resize") + var podClient *e2epod.PodClient + ginkgo.BeforeEach(func() { + podClient = e2epod.NewPodClient(f) + }) type testCase struct { name string @@ -1175,7 +1203,7 @@ func doPodResizeTests() { for idx := range tests { tc := tests[idx] - ginkgo.It(tc.name, func() { + ginkgo.It(tc.name, func(ctx context.Context) { var testPod, patchedPod *v1.Pod var pErr error @@ -1185,12 +1213,12 @@ func doPodResizeTests() { testPod = makeTestPod(f.Namespace.Name, "testpod", tStamp, tc.containers) ginkgo.By("creating pod") - newPod := f.PodClient().CreateSync(testPod) + newPod := podClient.CreateSync(ctx, testPod) ginkgo.By("verifying the pod is in kubernetes") selector := labels.SelectorFromSet(labels.Set(map[string]string{"time": tStamp})) options := metav1.ListOptions{LabelSelector: selector.String()} - podList, err := f.PodClient().List(context.TODO(), options) + podList, err := podClient.List(context.TODO(), options) framework.ExpectNoError(err, "failed to query for pods") gomega.Expect(len(podList.Items) == 1) @@ -1212,13 +1240,10 @@ func doPodResizeTests() { verifyPodAllocations(patchedPod, tc.containers, true) ginkgo.By("waiting for resize to be actuated") - resizedPod := waitForPodResizeActuation(f.PodClient(), newPod, patchedPod, tc.expected) + resizedPod := waitForPodResizeActuation(f.ClientSet, podClient, newPod, patchedPod, tc.expected) ginkgo.By("verifying pod container's cgroup values after resize") - //TODO(vinaykul,InPlacePodVerticalScaling): Remove this check when cgroupv2 support is added - if !isPodOnCgroupv2Node(resizedPod) { - verifyPodContainersCgroupValues(resizedPod, tc.expected, true) - } + verifyPodContainersCgroupValues(resizedPod, tc.expected, true) ginkgo.By("verifying pod resources after resize") verifyPodResources(resizedPod, tc.expected) @@ -1227,7 +1252,7 @@ func doPodResizeTests() { verifyPodAllocations(resizedPod, tc.expected, true) ginkgo.By("deleting pod") - err = e2epod.DeletePodWithWait(f.ClientSet, newPod) + err = e2epod.DeletePodWithWait(ctx, f.ClientSet, newPod) framework.ExpectNoError(err, "failed to delete pod") }) } @@ -1235,8 +1260,12 @@ func doPodResizeTests() { func doPodResizeResourceQuotaTests() { f := framework.NewDefaultFramework("pod-resize-resource-quota") + var podClient *e2epod.PodClient + ginkgo.BeforeEach(func() { + podClient = e2epod.NewPodClient(f) + }) - ginkgo.It("pod-resize-resource-quota-test", func() { + ginkgo.It("pod-resize-resource-quota-test", func(ctx context.Context) { resourceQuota := v1.ResourceQuota{ ObjectMeta: metav1.ObjectMeta{ Name: "resize-resource-quota", @@ -1282,13 +1311,13 @@ func doPodResizeResourceQuotaTests() { testPod2 := makeTestPod(f.Namespace.Name, "testpod2", tStamp, containers) ginkgo.By("creating pods") - newPod1 := f.PodClient().CreateSync(testPod1) - newPod2 := f.PodClient().CreateSync(testPod2) + newPod1 := podClient.CreateSync(ctx, testPod1) + newPod2 := podClient.CreateSync(ctx, testPod2) ginkgo.By("verifying the pod is in kubernetes") selector := labels.SelectorFromSet(labels.Set(map[string]string{"time": tStamp})) options := metav1.ListOptions{LabelSelector: selector.String()} - podList, listErr := f.PodClient().List(context.TODO(), options) + podList, listErr := podClient.List(context.TODO(), options) framework.ExpectNoError(listErr, "failed to query for pods") gomega.Expect(len(podList.Items) == 2) @@ -1305,13 +1334,10 @@ func doPodResizeResourceQuotaTests() { verifyPodAllocations(patchedPod, containers, true) ginkgo.By("waiting for resize to be actuated") - resizedPod := waitForPodResizeActuation(f.PodClient(), newPod1, patchedPod, expected) + resizedPod := waitForPodResizeActuation(f.ClientSet, podClient, newPod1, patchedPod, expected) ginkgo.By("verifying pod container's cgroup values after resize") - //TODO(vinaykul,InPlacePodVerticalScaling): Remove this check when cgroupv2 support is added - if !isPodOnCgroupv2Node(resizedPod) { - verifyPodContainersCgroupValues(resizedPod, expected, true) - } + verifyPodContainersCgroupValues(resizedPod, expected, true) ginkgo.By("verifying pod resources after resize") verifyPodResources(resizedPod, expected) @@ -1319,18 +1345,6 @@ func doPodResizeResourceQuotaTests() { ginkgo.By("verifying pod allocations after resize") verifyPodAllocations(resizedPod, expected, true) - ginkgo.By(fmt.Sprintf("patching pod %s for resize with CPU exceeding resource quota", resizedPod.Name)) - _, pErrExceedCPU := f.ClientSet.CoreV1().Pods(resizedPod.Namespace).Patch(context.TODO(), - resizedPod.Name, types.StrategicMergePatchType, []byte(patchStringExceedCPU), metav1.PatchOptions{}) - framework.ExpectError(pErrExceedCPU, "exceeded quota: %s, requested: cpu=200m, used: cpu=700m, limited: cpu=800m", - resourceQuota.Name) - - ginkgo.By("verifying pod patched for resize exceeding CPU resource quota remains unchanged") - patchedPodExceedCPU, pErrEx1 := f.PodClient().Get(context.TODO(), resizedPod.Name, metav1.GetOptions{}) - framework.ExpectNoError(pErrEx1, "failed to get pod post exceed CPU resize") - verifyPodResources(patchedPodExceedCPU, expected) - verifyPodAllocations(patchedPodExceedCPU, expected, true) - ginkgo.By("patching pod for resize with memory exceeding resource quota") _, pErrExceedMemory := f.ClientSet.CoreV1().Pods(resizedPod.Namespace).Patch(context.TODO(), resizedPod.Name, types.StrategicMergePatchType, []byte(patchStringExceedMemory), metav1.PatchOptions{}) @@ -1338,21 +1352,37 @@ func doPodResizeResourceQuotaTests() { resourceQuota.Name) ginkgo.By("verifying pod patched for resize exceeding memory resource quota remains unchanged") - patchedPodExceedMemory, pErrEx2 := f.PodClient().Get(context.TODO(), resizedPod.Name, metav1.GetOptions{}) + patchedPodExceedMemory, pErrEx2 := podClient.Get(context.TODO(), resizedPod.Name, metav1.GetOptions{}) framework.ExpectNoError(pErrEx2, "failed to get pod post exceed memory resize") verifyPodResources(patchedPodExceedMemory, expected) verifyPodAllocations(patchedPodExceedMemory, expected, true) + ginkgo.By(fmt.Sprintf("patching pod %s for resize with CPU exceeding resource quota", resizedPod.Name)) + _, pErrExceedCPU := f.ClientSet.CoreV1().Pods(resizedPod.Namespace).Patch(context.TODO(), + resizedPod.Name, types.StrategicMergePatchType, []byte(patchStringExceedCPU), metav1.PatchOptions{}) + framework.ExpectError(pErrExceedCPU, "exceeded quota: %s, requested: cpu=200m, used: cpu=700m, limited: cpu=800m", + resourceQuota.Name) + + ginkgo.By("verifying pod patched for resize exceeding CPU resource quota remains unchanged") + patchedPodExceedCPU, pErrEx1 := podClient.Get(context.TODO(), resizedPod.Name, metav1.GetOptions{}) + framework.ExpectNoError(pErrEx1, "failed to get pod post exceed CPU resize") + verifyPodResources(patchedPodExceedCPU, expected) + verifyPodAllocations(patchedPodExceedCPU, expected, true) + ginkgo.By("deleting pods") - delErr1 := e2epod.DeletePodWithWait(f.ClientSet, newPod1) + delErr1 := e2epod.DeletePodWithWait(ctx, f.ClientSet, newPod1) framework.ExpectNoError(delErr1, "failed to delete pod %s", newPod1.Name) - delErr2 := e2epod.DeletePodWithWait(f.ClientSet, newPod2) + delErr2 := e2epod.DeletePodWithWait(ctx, f.ClientSet, newPod2) framework.ExpectNoError(delErr2, "failed to delete pod %s", newPod2.Name) }) } func doPodResizeErrorTests() { f := framework.NewDefaultFramework("pod-resize-errors") + var podClient *e2epod.PodClient + ginkgo.BeforeEach(func() { + podClient = e2epod.NewPodClient(f) + }) type testCase struct { name string @@ -1384,7 +1414,7 @@ func doPodResizeErrorTests() { for idx := range tests { tc := tests[idx] - ginkgo.It(tc.name, func() { + ginkgo.It(tc.name, func(ctx context.Context) { var testPod, patchedPod *v1.Pod var pErr error @@ -1394,12 +1424,12 @@ func doPodResizeErrorTests() { testPod = makeTestPod(f.Namespace.Name, "testpod", tStamp, tc.containers) ginkgo.By("creating pod") - newPod := f.PodClient().CreateSync(testPod) + newPod := podClient.CreateSync(ctx, testPod) ginkgo.By("verifying the pod is in kubernetes") selector := labels.SelectorFromSet(labels.Set(map[string]string{"time": tStamp})) options := metav1.ListOptions{LabelSelector: selector.String()} - podList, err := f.PodClient().List(context.TODO(), options) + podList, err := podClient.List(context.TODO(), options) framework.ExpectNoError(err, "failed to query for pods") gomega.Expect(len(podList.Items) == 1) @@ -1422,10 +1452,7 @@ func doPodResizeErrorTests() { } ginkgo.By("verifying pod container's cgroup values after patch") - //TODO(vinaykul,InPlacePodVerticalScaling): Remove this check when cgroupv2 support is added - if !isPodOnCgroupv2Node(patchedPod) { - verifyPodContainersCgroupValues(patchedPod, tc.expected, true) - } + verifyPodContainersCgroupValues(patchedPod, tc.expected, true) ginkgo.By("verifying pod resources after patch") verifyPodResources(patchedPod, tc.expected) @@ -1434,12 +1461,178 @@ func doPodResizeErrorTests() { verifyPodAllocations(patchedPod, tc.expected, true) ginkgo.By("deleting pod") - err = e2epod.DeletePodWithWait(f.ClientSet, newPod) + err = e2epod.DeletePodWithWait(ctx, f.ClientSet, newPod) framework.ExpectNoError(err, "failed to delete pod") }) } } +func doPodResizeSchedulerTests() { + f := framework.NewDefaultFramework("pod-resize-scheduler") + var podClient *e2epod.PodClient + ginkgo.BeforeEach(func() { + podClient = e2epod.NewPodClient(f) + }) + + ginkgo.It("pod-resize-scheduler-tests", func(ctx context.Context) { + nodes, err := e2enode.GetReadySchedulableNodes(ctx, f.ClientSet) + framework.ExpectNoError(err, "failed to get running nodes") + gomega.Expect(len(nodes.Items) > 0) + framework.Logf("Found %d schedulable nodes", len(nodes.Items)) + + // + // Calculate available CPU. nodeAvailableCPU = nodeAllocatableCPU - sum(podAllocatedCPU) + // + getNodeAllocatableAndAvailableMilliCPUValues := func(n *v1.Node) (int64, int64) { + nodeAllocatableMilliCPU := n.Status.Allocatable.Cpu().MilliValue() + gomega.Expect(n.Status.Allocatable != nil) + podAllocatedMilliCPU := int64(0) + listOptions := metav1.ListOptions{FieldSelector: "spec.nodeName=" + n.Name} + podList, err := f.ClientSet.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), listOptions) + framework.ExpectNoError(err, "failed to get running pods") + framework.Logf("Found %d pods on node '%s'", len(podList.Items), n.Name) + for _, pod := range podList.Items { + podRequestMilliCPU := resourceapi.GetResourceRequest(&pod, v1.ResourceCPU) + podAllocatedMilliCPU += podRequestMilliCPU + } + nodeAvailableMilliCPU := nodeAllocatableMilliCPU - podAllocatedMilliCPU + return nodeAllocatableMilliCPU, nodeAvailableMilliCPU + } + + ginkgo.By("Find node CPU resources available for allocation!") + node := nodes.Items[0] + nodeAllocatableMilliCPU, nodeAvailableMilliCPU := getNodeAllocatableAndAvailableMilliCPUValues(&node) + framework.Logf("Node '%s': NodeAllocatable MilliCPUs = %dm. MilliCPUs currently available to allocate = %dm.", + node.Name, nodeAllocatableMilliCPU, nodeAvailableMilliCPU) + + // + // Scheduler focussed pod resize E2E test case #1: + // 1. Create pod1 and pod2 on node such that pod1 has enough CPU to be scheduled, but pod2 does not. + // 2. Resize pod2 down so that it fits on the node and can be scheduled. + // 3. Verify that pod2 gets scheduled and comes up and running. + // + testPod1CPUQuantity := resource.NewMilliQuantity(nodeAvailableMilliCPU/2, resource.DecimalSI) + testPod2CPUQuantity := resource.NewMilliQuantity(nodeAvailableMilliCPU, resource.DecimalSI) + testPod2CPUQuantityResized := resource.NewMilliQuantity(testPod1CPUQuantity.MilliValue()/2, resource.DecimalSI) + framework.Logf("TEST1: testPod1 initial CPU request is '%dm'", testPod1CPUQuantity.MilliValue()) + framework.Logf("TEST1: testPod2 initial CPU request is '%dm'", testPod2CPUQuantity.MilliValue()) + framework.Logf("TEST1: testPod2 resized CPU request is '%dm'", testPod2CPUQuantityResized.MilliValue()) + + c1 := []TestContainerInfo{ + { + Name: "c1", + Resources: &ContainerResources{CPUReq: testPod1CPUQuantity.String(), CPULim: testPod1CPUQuantity.String()}, + }, + } + c2 := []TestContainerInfo{ + { + Name: "c2", + Resources: &ContainerResources{CPUReq: testPod2CPUQuantity.String(), CPULim: testPod2CPUQuantity.String()}, + }, + } + patchTestpod2ToFitNode := fmt.Sprintf(`{ + "spec": { + "containers": [ + { + "name": "c2", + "resources": {"requests": {"cpu": "%dm"}, "limits": {"cpu": "%dm"}} + } + ] + } + }`, testPod2CPUQuantityResized.MilliValue(), testPod2CPUQuantityResized.MilliValue()) + + tStamp := strconv.Itoa(time.Now().Nanosecond()) + initDefaultResizePolicy(c1) + initDefaultResizePolicy(c2) + testPod1 := makeTestPod(f.Namespace.Name, "testpod1", tStamp, c1) + testPod2 := makeTestPod(f.Namespace.Name, "testpod2", tStamp, c2) + e2epod.SetNodeAffinity(&testPod1.Spec, node.Name) + e2epod.SetNodeAffinity(&testPod2.Spec, node.Name) + + ginkgo.By(fmt.Sprintf("TEST1: Create pod '%s' that fits the node '%s'", testPod1.Name, node.Name)) + testPod1 = podClient.CreateSync(ctx, testPod1) + framework.ExpectEqual(testPod1.Status.Phase, v1.PodRunning) + + ginkgo.By(fmt.Sprintf("TEST1: Create pod '%s' that won't fit node '%s' with pod '%s' on it", testPod2.Name, node.Name, testPod1.Name)) + testPod2 = podClient.Create(ctx, testPod2) + err = e2epod.WaitForPodNameUnschedulableInNamespace(ctx, f.ClientSet, testPod2.Name, testPod2.Namespace) + framework.ExpectNoError(err) + framework.ExpectEqual(testPod2.Status.Phase, v1.PodPending) + + ginkgo.By(fmt.Sprintf("TEST1: Resize pod '%s' to fit in node '%s'", testPod2.Name, node.Name)) + testPod2, pErr := f.ClientSet.CoreV1().Pods(testPod2.Namespace).Patch(context.TODO(), + testPod2.Name, types.StrategicMergePatchType, []byte(patchTestpod2ToFitNode), metav1.PatchOptions{}) + framework.ExpectNoError(pErr, "failed to patch pod for resize") + + ginkgo.By(fmt.Sprintf("TEST1: Verify that pod '%s' is running after resize", testPod2.Name)) + framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, testPod2)) + + // + // Scheduler focussed pod resize E2E test case #2 + // 1. With pod1 + pod2 running on node above, create pod3 that requests more CPU than available, verify pending. + // 2. Resize pod1 down so that pod3 gets room to be scheduled. + // 3. Verify that pod3 is scheduled and running. + // + nodeAllocatableMilliCPU2, nodeAvailableMilliCPU2 := getNodeAllocatableAndAvailableMilliCPUValues(&node) + framework.Logf("TEST2: Node '%s': NodeAllocatable MilliCPUs = %dm. MilliCPUs currently available to allocate = %dm.", + node.Name, nodeAllocatableMilliCPU2, nodeAvailableMilliCPU2) + testPod3CPUQuantity := resource.NewMilliQuantity(nodeAvailableMilliCPU2+testPod1CPUQuantity.MilliValue()/2, resource.DecimalSI) + testPod1CPUQuantityResized := resource.NewMilliQuantity(testPod1CPUQuantity.MilliValue()/3, resource.DecimalSI) + framework.Logf("TEST2: testPod1 MilliCPUs after resize '%dm'", testPod1CPUQuantityResized.MilliValue()) + + c3 := []TestContainerInfo{ + { + Name: "c3", + Resources: &ContainerResources{CPUReq: testPod3CPUQuantity.String(), CPULim: testPod3CPUQuantity.String()}, + }, + } + patchTestpod1ToMakeSpaceForPod3 := fmt.Sprintf(`{ + "spec": { + "containers": [ + { + "name": "c1", + "resources": {"requests": {"cpu": "%dm"},"limits": {"cpu": "%dm"}} + } + ] + } + }`, testPod1CPUQuantityResized.MilliValue(), testPod1CPUQuantityResized.MilliValue()) + + tStamp = strconv.Itoa(time.Now().Nanosecond()) + initDefaultResizePolicy(c3) + testPod3 := makeTestPod(f.Namespace.Name, "testpod3", tStamp, c3) + e2epod.SetNodeAffinity(&testPod3.Spec, node.Name) + + ginkgo.By(fmt.Sprintf("TEST2: Create testPod3 '%s' that cannot fit node '%s' due to insufficient CPU.", testPod3.Name, node.Name)) + testPod3 = podClient.Create(ctx, testPod3) + p3Err := e2epod.WaitForPodNameUnschedulableInNamespace(ctx, f.ClientSet, testPod3.Name, testPod3.Namespace) + framework.ExpectNoError(p3Err, "failed to create pod3 or pod3 did not become pending!") + framework.ExpectEqual(testPod3.Status.Phase, v1.PodPending) + + ginkgo.By(fmt.Sprintf("TEST2: Resize pod '%s' to make enough space for pod '%s'", testPod1.Name, testPod3.Name)) + testPod1, p1Err := f.ClientSet.CoreV1().Pods(testPod1.Namespace).Patch(context.TODO(), + testPod1.Name, types.StrategicMergePatchType, []byte(patchTestpod1ToMakeSpaceForPod3), metav1.PatchOptions{}) + framework.ExpectNoError(p1Err, "failed to patch pod for resize") + + ginkgo.By(fmt.Sprintf("TEST2: Verify pod '%s' is running after successfully resizing pod '%s'", testPod3.Name, testPod1.Name)) + framework.Logf("TEST2: Pod '%s' CPU requests '%dm'", testPod1.Name, testPod1.Spec.Containers[0].Resources.Requests.Cpu().MilliValue()) + framework.Logf("TEST2: Pod '%s' CPU requests '%dm'", testPod2.Name, testPod2.Spec.Containers[0].Resources.Requests.Cpu().MilliValue()) + framework.Logf("TEST2: Pod '%s' CPU requests '%dm'", testPod3.Name, testPod3.Spec.Containers[0].Resources.Requests.Cpu().MilliValue()) + framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, testPod3)) + + ginkgo.By("deleting pods") + delErr1 := e2epod.DeletePodWithWait(ctx, f.ClientSet, testPod1) + framework.ExpectNoError(delErr1, "failed to delete pod %s", testPod1.Name) + delErr2 := e2epod.DeletePodWithWait(ctx, f.ClientSet, testPod2) + framework.ExpectNoError(delErr2, "failed to delete pod %s", testPod2.Name) + delErr3 := e2epod.DeletePodWithWait(ctx, f.ClientSet, testPod3) + framework.ExpectNoError(delErr3, "failed to delete pod %s", testPod3.Name) + }) +} + +var _ = SIGDescribe("[Serial] Pod InPlace Resize Container (scheduler-focussed) [Feature:InPlacePodVerticalScaling]", func() { + doPodResizeSchedulerTests() +}) + var _ = SIGDescribe("Pod InPlace Resize Container [Feature:InPlacePodVerticalScaling]", func() { doPodResizeTests() doPodResizeResourceQuotaTests()