Merge pull request #95479 from mgjeong/feature-memory-manager-working

Memory manager
This commit is contained in:
Kubernetes Prow Robot 2021-02-09 03:40:56 -08:00 committed by GitHub
commit 943e67c01f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
46 changed files with 8462 additions and 28 deletions

View File

@ -392,6 +392,7 @@ API rule violation: list_type_missing,k8s.io/kubelet/config/v1alpha1,CredentialP
API rule violation: list_type_missing,k8s.io/kubelet/config/v1beta1,KubeletConfiguration,AllowedUnsafeSysctls
API rule violation: list_type_missing,k8s.io/kubelet/config/v1beta1,KubeletConfiguration,ClusterDNS
API rule violation: list_type_missing,k8s.io/kubelet/config/v1beta1,KubeletConfiguration,EnforceNodeAllocatable
API rule violation: list_type_missing,k8s.io/kubelet/config/v1beta1,KubeletConfiguration,ReservedMemory
API rule violation: list_type_missing,k8s.io/kubelet/config/v1beta1,KubeletConfiguration,TLSCipherSuites
API rule violation: list_type_missing,k8s.io/metrics/pkg/apis/metrics/v1alpha1,PodMetrics,Containers
API rule violation: list_type_missing,k8s.io/metrics/pkg/apis/metrics/v1beta1,PodMetrics,Containers

View File

@ -550,4 +550,9 @@ Runtime log sanitization may introduce significant computation overhead and ther
// Graduated experimental flags, kept for backward compatibility
fs.BoolVar(&c.KernelMemcgNotification, "experimental-kernel-memcg-notification", c.KernelMemcgNotification, "Use kernelMemcgNotification configuration, this flag will be removed in 1.23.")
// Memory Manager Flags
fs.StringVar(&c.MemoryManagerPolicy, "memory-manager-policy", c.MemoryManagerPolicy, "Memory Manager policy to use. Possible values: 'None', 'Static'. Default: 'None'")
// TODO: once documentation link is available, replace KEP link with the documentation one.
fs.Var(&utilflag.ReservedMemoryVar{Value: &c.ReservedMemory}, "reserved-memory", "A comma separated list of memory reservations for NUMA nodes. (e.g. --reserved-memory 0:memory=1Gi,hugepages-1M=2Gi --reserved-memory 1:memory=2Gi). The total sum for each memory type should be equal to the sum of kube-reserved, system-reserved and eviction-threshold. See more details under https://github.com/kubernetes/enhancements/tree/master/keps/sig-node/1769-memory-manager#reserved-memory-flag")
}

View File

@ -687,6 +687,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
s.SystemReserved["cpu"] = strconv.Itoa(reservedSystemCPUs.Size())
klog.Infof("After cpu setting is overwritten, KubeReserved=\"%v\", SystemReserved=\"%v\"", s.KubeReserved, s.SystemReserved)
}
kubeReserved, err := parseResourceList(s.KubeReserved)
if err != nil {
return err
@ -735,6 +736,8 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
QOSReserved: *experimentalQOSReserved,
ExperimentalCPUManagerPolicy: s.CPUManagerPolicy,
ExperimentalCPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration,
ExperimentalMemoryManagerPolicy: s.MemoryManagerPolicy,
ExperimentalMemoryManagerReservedMemory: s.ReservedMemory,
ExperimentalPodPidsLimit: s.PodPidsLimit,
EnforceCPULimits: s.CPUCFSQuota,
CPUCFSQuotaPeriod: s.CPUCFSQuotaPeriod.Duration,

View File

@ -123,6 +123,12 @@ const (
// Enable resource managers to make NUMA aligned decisions
TopologyManager featuregate.Feature = "TopologyManager"
// owner: @cynepco3hahue(alukiano) @cezaryzukowski @k-wiatrzyk
// alpha:: v1.20
// Allows setting memory affinity for a container based on NUMA topology
MemoryManager featuregate.Feature = "MemoryManager"
// owner: @sjenning
// beta: v1.11
//
@ -697,6 +703,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
ExpandInUsePersistentVolumes: {Default: true, PreRelease: featuregate.Beta},
ExpandCSIVolumes: {Default: true, PreRelease: featuregate.Beta},
CPUManager: {Default: true, PreRelease: featuregate.Beta},
MemoryManager: {Default: false, PreRelease: featuregate.Alpha},
CPUCFSQuotaPeriod: {Default: false, PreRelease: featuregate.Alpha},
TopologyManager: {Default: true, PreRelease: featuregate.Beta},
ServiceNodeExclusion: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.22

View File

@ -62,6 +62,7 @@ func Funcs(codecs runtimeserializer.CodecFactory) []interface{} {
obj.KernelMemcgNotification = false
obj.MaxOpenFiles = 1000000
obj.MaxPods = 110
obj.MemoryManagerPolicy = v1beta1.NoneMemoryManagerPolicy
obj.PodPidsLimit = -1
obj.NodeStatusUpdateFrequency = metav1.Duration{Duration: 10 * time.Second}
obj.NodeStatusReportFrequency = metav1.Duration{Duration: time.Minute}

View File

@ -206,6 +206,7 @@ var (
"StaticPodURLHeader[*][*]",
"MaxOpenFiles",
"MaxPods",
"MemoryManagerPolicy",
"NodeLeaseDurationSeconds",
"NodeStatusMaxImages",
"NodeStatusUpdateFrequency.Duration",
@ -220,6 +221,14 @@ var (
"ReadOnlyPort",
"RegistryBurst",
"RegistryPullQPS",
"ReservedMemory[*].Limits[*].Format",
"ReservedMemory[*].Limits[*].d.Dec.scale",
"ReservedMemory[*].Limits[*].d.Dec.unscaled.abs[*]",
"ReservedMemory[*].Limits[*].d.Dec.unscaled.neg",
"ReservedMemory[*].Limits[*].i.scale",
"ReservedMemory[*].Limits[*].i.value",
"ReservedMemory[*].Limits[*].s",
"ReservedMemory[*].NumaNode",
"ReservedSystemCPUs",
"RuntimeRequestTimeout.Duration",
"RunOnce",

View File

@ -55,6 +55,7 @@ logging:
makeIPTablesUtilChains: true
maxOpenFiles: 1000000
maxPods: 110
memoryManagerPolicy: None
nodeLeaseDurationSeconds: 40
nodeStatusMaxImages: 50
nodeStatusReportFrequency: 5m0s

View File

@ -55,6 +55,7 @@ logging:
makeIPTablesUtilChains: true
maxOpenFiles: 1000000
maxPods: 110
memoryManagerPolicy: None
nodeLeaseDurationSeconds: 40
nodeStatusMaxImages: 50
nodeStatusReportFrequency: 5m0s

View File

@ -224,6 +224,9 @@ type KubeletConfiguration struct {
// CPU Manager reconciliation period.
// Requires the CPUManager feature gate to be enabled.
CPUManagerReconcilePeriod metav1.Duration
// MemoryManagerPolicy is the name of the policy to use.
// Requires the MemoryManager feature gate to be enabled.
MemoryManagerPolicy string
// TopologyManagerPolicy is the name of the policy to use.
// Policies other than "none" require the TopologyManager feature gate to be enabled.
TopologyManagerPolicy string
@ -382,6 +385,20 @@ type KubeletConfiguration struct {
// Defaults to 10 seconds, requires GracefulNodeShutdown feature gate to be enabled.
// For example, if ShutdownGracePeriod=30s, and ShutdownGracePeriodCriticalPods=10s, during a node shutdown the first 20 seconds would be reserved for gracefully terminating normal pods, and the last 10 seconds would be reserved for terminating critical pods.
ShutdownGracePeriodCriticalPods metav1.Duration
// ReservedMemory specifies a comma-separated list of memory reservations for NUMA nodes.
// The parameter makes sense only in the context of the memory manager feature. The memory manager will not allocate reserved memory for container workloads.
// For example, if you have a NUMA0 with 10Gi of memory and the ReservedMemory was specified to reserve 1Gi of memory at NUMA0,
// the memory manager will assume that only 9Gi is available for allocation.
// You can specify a different amount of NUMA node and memory types.
// You can omit this parameter at all, but you should be aware that the amount of reserved memory from all NUMA nodes
// should be equal to the amount of memory specified by the node allocatable features(https://kubernetes.io/docs/tasks/administer-cluster/reserve-compute-resources/#node-allocatable).
// If at least one node allocatable parameter has a non-zero value, you will need to specify at least one NUMA node.
// Also, avoid specifying:
// 1. Duplicates, the same NUMA node, and memory type, but with a different value.
// 2. zero limits for any memory type.
// 3. NUMAs nodes IDs that do not exist under the machine.
// 4. memory types except for memory and hugepages-<size>
ReservedMemory []MemoryReservation
}
// KubeletAuthorizationMode denotes the authorization mode for the kubelet
@ -535,3 +552,9 @@ type ExecEnvVar struct {
Name string
Value string
}
// MemoryReservation specifies the memory reservation of different types for each NUMA node
type MemoryReservation struct {
NumaNode int32
Limits v1.ResourceList
}

View File

@ -19,10 +19,12 @@ go_library(
],
importpath = "k8s.io/kubernetes/pkg/kubelet/apis/config/v1beta1",
deps = [
"//pkg/apis/core/v1:go_default_library",
"//pkg/cluster/ports:go_default_library",
"//pkg/kubelet/apis/config:go_default_library",
"//pkg/kubelet/qos:go_default_library",
"//pkg/kubelet/types:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/conversion:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",

View File

@ -23,6 +23,7 @@ import (
kruntime "k8s.io/apimachinery/pkg/runtime"
componentbaseconfigv1alpha1 "k8s.io/component-base/config/v1alpha1"
kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
// TODO: Cut references to k8s.io/kubernetes, eventually there should be none from this package
"k8s.io/kubernetes/pkg/cluster/ports"
"k8s.io/kubernetes/pkg/kubelet/qos"
@ -154,6 +155,9 @@ func SetDefaults_KubeletConfiguration(obj *kubeletconfigv1beta1.KubeletConfigura
// Keep the same as default NodeStatusUpdateFrequency
obj.CPUManagerReconcilePeriod = metav1.Duration{Duration: 10 * time.Second}
}
if obj.MemoryManagerPolicy == "" {
obj.MemoryManagerPolicy = kubeletconfigv1beta1.NoneMemoryManagerPolicy
}
if obj.TopologyManagerPolicy == "" {
obj.TopologyManagerPolicy = kubeletconfigv1beta1.NoneTopologyManagerPolicy
}

View File

@ -23,6 +23,7 @@ package v1beta1
import (
unsafe "unsafe"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
conversion "k8s.io/apimachinery/pkg/conversion"
runtime "k8s.io/apimachinery/pkg/runtime"
@ -108,6 +109,16 @@ func RegisterConversions(s *runtime.Scheme) error {
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*v1beta1.MemoryReservation)(nil), (*config.MemoryReservation)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1beta1_MemoryReservation_To_config_MemoryReservation(a.(*v1beta1.MemoryReservation), b.(*config.MemoryReservation), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*config.MemoryReservation)(nil), (*v1beta1.MemoryReservation)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_config_MemoryReservation_To_v1beta1_MemoryReservation(a.(*config.MemoryReservation), b.(*v1beta1.MemoryReservation), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*v1beta1.SerializedNodeConfigSource)(nil), (*config.SerializedNodeConfigSource)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1beta1_SerializedNodeConfigSource_To_config_SerializedNodeConfigSource(a.(*v1beta1.SerializedNodeConfigSource), b.(*config.SerializedNodeConfigSource), scope)
}); err != nil {
@ -274,6 +285,7 @@ func autoConvert_v1beta1_KubeletConfiguration_To_config_KubeletConfiguration(in
out.CgroupDriver = in.CgroupDriver
out.CPUManagerPolicy = in.CPUManagerPolicy
out.CPUManagerReconcilePeriod = in.CPUManagerReconcilePeriod
out.MemoryManagerPolicy = in.MemoryManagerPolicy
out.TopologyManagerPolicy = in.TopologyManagerPolicy
out.TopologyManagerScope = in.TopologyManagerScope
out.QOSReserved = *(*map[string]string)(unsafe.Pointer(&in.QOSReserved))
@ -352,6 +364,7 @@ func autoConvert_v1beta1_KubeletConfiguration_To_config_KubeletConfiguration(in
}
out.ShutdownGracePeriod = in.ShutdownGracePeriod
out.ShutdownGracePeriodCriticalPods = in.ShutdownGracePeriodCriticalPods
out.ReservedMemory = *(*[]config.MemoryReservation)(unsafe.Pointer(&in.ReservedMemory))
return nil
}
@ -429,6 +442,7 @@ func autoConvert_config_KubeletConfiguration_To_v1beta1_KubeletConfiguration(in
out.CgroupDriver = in.CgroupDriver
out.CPUManagerPolicy = in.CPUManagerPolicy
out.CPUManagerReconcilePeriod = in.CPUManagerReconcilePeriod
out.MemoryManagerPolicy = in.MemoryManagerPolicy
out.TopologyManagerPolicy = in.TopologyManagerPolicy
out.TopologyManagerScope = in.TopologyManagerScope
out.QOSReserved = *(*map[string]string)(unsafe.Pointer(&in.QOSReserved))
@ -505,6 +519,7 @@ func autoConvert_config_KubeletConfiguration_To_v1beta1_KubeletConfiguration(in
}
out.ShutdownGracePeriod = in.ShutdownGracePeriod
out.ShutdownGracePeriodCriticalPods = in.ShutdownGracePeriodCriticalPods
out.ReservedMemory = *(*[]v1beta1.MemoryReservation)(unsafe.Pointer(&in.ReservedMemory))
return nil
}
@ -581,6 +596,28 @@ func Convert_config_KubeletX509Authentication_To_v1beta1_KubeletX509Authenticati
return autoConvert_config_KubeletX509Authentication_To_v1beta1_KubeletX509Authentication(in, out, s)
}
func autoConvert_v1beta1_MemoryReservation_To_config_MemoryReservation(in *v1beta1.MemoryReservation, out *config.MemoryReservation, s conversion.Scope) error {
out.NumaNode = in.NumaNode
out.Limits = *(*corev1.ResourceList)(unsafe.Pointer(&in.Limits))
return nil
}
// Convert_v1beta1_MemoryReservation_To_config_MemoryReservation is an autogenerated conversion function.
func Convert_v1beta1_MemoryReservation_To_config_MemoryReservation(in *v1beta1.MemoryReservation, out *config.MemoryReservation, s conversion.Scope) error {
return autoConvert_v1beta1_MemoryReservation_To_config_MemoryReservation(in, out, s)
}
func autoConvert_config_MemoryReservation_To_v1beta1_MemoryReservation(in *config.MemoryReservation, out *v1beta1.MemoryReservation, s conversion.Scope) error {
out.NumaNode = in.NumaNode
out.Limits = *(*corev1.ResourceList)(unsafe.Pointer(&in.Limits))
return nil
}
// Convert_config_MemoryReservation_To_v1beta1_MemoryReservation is an autogenerated conversion function.
func Convert_config_MemoryReservation_To_v1beta1_MemoryReservation(in *config.MemoryReservation, out *v1beta1.MemoryReservation, s conversion.Scope) error {
return autoConvert_config_MemoryReservation_To_v1beta1_MemoryReservation(in, out, s)
}
func autoConvert_v1beta1_SerializedNodeConfigSource_To_config_SerializedNodeConfigSource(in *v1beta1.SerializedNodeConfigSource, out *config.SerializedNodeConfigSource, s conversion.Scope) error {
out.Source = in.Source
return nil

View File

@ -23,6 +23,7 @@ package v1beta1
import (
runtime "k8s.io/apimachinery/pkg/runtime"
v1beta1 "k8s.io/kubelet/config/v1beta1"
v1 "k8s.io/kubernetes/pkg/apis/core/v1"
)
// RegisterDefaults adds defaulters functions to the given scheme.
@ -35,4 +36,8 @@ func RegisterDefaults(scheme *runtime.Scheme) error {
func SetObjectDefaults_KubeletConfiguration(in *v1beta1.KubeletConfiguration) {
SetDefaults_KubeletConfiguration(in)
for i := range in.ReservedMemory {
a := &in.ReservedMemory[i]
v1.SetDefaults_ResourceList(&a.Limits)
}
}

View File

@ -11,14 +11,17 @@ go_library(
srcs = [
"validation.go",
"validation_others.go",
"validation_reserved_memory.go",
"validation_windows.go",
],
importpath = "k8s.io/kubernetes/pkg/kubelet/apis/config/validation",
deps = [
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/features:go_default_library",
"//pkg/kubelet/apis/config:go_default_library",
"//pkg/kubelet/cm/cpuset:go_default_library",
"//pkg/kubelet/types:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/validation:go_default_library",
@ -43,10 +46,15 @@ filegroup(
go_test(
name = "go_default_test",
srcs = ["validation_test.go"],
srcs = [
"validation_reserved_memory_test.go",
"validation_test.go",
],
embed = [":go_default_library"],
deps = [
"//pkg/kubelet/apis/config:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
],

View File

@ -193,6 +193,8 @@ func ValidateKubeletConfiguration(kc *kubeletconfig.KubeletConfiguration) error
}
}
allErrors = append(allErrors, validateReservedMemoryConfiguration(kc)...)
if err := validateKubeletOSConfiguration(kc); err != nil {
allErrors = append(allErrors, err)
}

View File

@ -0,0 +1,64 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package validation
import (
"fmt"
v1 "k8s.io/api/core/v1"
corev1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
)
// validateReservedMemory validates the reserved memory configuration and returns an error if it is invalid.
func validateReservedMemoryConfiguration(kc *kubeletconfig.KubeletConfiguration) []error {
if len(kc.ReservedMemory) == 0 {
return nil
}
var errors []error
numaTypeDuplicates := map[int32]map[v1.ResourceName]bool{}
for _, reservedMemory := range kc.ReservedMemory {
numaNode := reservedMemory.NumaNode
if _, ok := numaTypeDuplicates[numaNode]; !ok {
numaTypeDuplicates[numaNode] = map[v1.ResourceName]bool{}
}
for resourceName, q := range reservedMemory.Limits {
if !reservedMemorySupportedLimit(resourceName) {
errors = append(errors, fmt.Errorf("the limit type %q for NUMA node %d is not supported, only %v is accepted", resourceName, numaNode, []v1.ResourceName{v1.ResourceMemory, v1.ResourceHugePagesPrefix + "<HugePageSize>"}))
}
// validates that the limit has non-zero value
if q.IsZero() {
errors = append(errors, fmt.Errorf("reserved memory may not be zero for NUMA node %d and resource %q", numaNode, resourceName))
}
// validates that no duplication for NUMA node and limit type occurred
if _, ok := numaTypeDuplicates[numaNode][resourceName]; ok {
errors = append(errors, fmt.Errorf("the reserved memory has a duplicate value for NUMA node %d and resource %q", numaNode, resourceName))
}
numaTypeDuplicates[numaNode][resourceName] = true
}
}
return errors
}
func reservedMemorySupportedLimit(resourceName v1.ResourceName) bool {
return corev1helper.IsHugePageResourceName(resourceName) || resourceName == v1.ResourceMemory
}

View File

@ -0,0 +1,120 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package validation
import (
"fmt"
"testing"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
)
func TestValidateReservedMemoryConfiguration(t *testing.T) {
testCases := []struct {
description string
kubeletConfiguration *kubeletconfig.KubeletConfiguration
expectedError error
}{
{
description: "The kubelet configuration does not have reserved memory parameter",
kubeletConfiguration: &kubeletconfig.KubeletConfiguration{},
expectedError: nil,
},
{
description: "The kubelet configuration has valid reserved memory parameter",
kubeletConfiguration: &kubeletconfig.KubeletConfiguration{
ReservedMemory: []kubeletconfig.MemoryReservation{
{
NumaNode: 0,
Limits: v1.ResourceList{
v1.ResourceMemory: *resource.NewQuantity(128, resource.DecimalSI),
},
},
},
},
expectedError: nil,
},
{
description: "The reserved memory has duplications for the NUMA node and limit type",
kubeletConfiguration: &kubeletconfig.KubeletConfiguration{
ReservedMemory: []kubeletconfig.MemoryReservation{
{
NumaNode: 0,
Limits: v1.ResourceList{
v1.ResourceMemory: *resource.NewQuantity(128, resource.DecimalSI),
},
},
{
NumaNode: 0,
Limits: v1.ResourceList{
v1.ResourceMemory: *resource.NewQuantity(64, resource.DecimalSI),
},
},
},
},
expectedError: fmt.Errorf("the reserved memory has a duplicate value for NUMA node %d and resource %q", 0, v1.ResourceMemory),
},
{
description: "The reserved memory has unsupported limit type",
kubeletConfiguration: &kubeletconfig.KubeletConfiguration{
ReservedMemory: []kubeletconfig.MemoryReservation{
{
NumaNode: 0,
Limits: v1.ResourceList{
"blabla": *resource.NewQuantity(128, resource.DecimalSI),
},
},
},
},
expectedError: fmt.Errorf("the limit type %q for NUMA node %d is not supported, only [memory hugepages-<HugePageSize>] is accepted", "blabla", 0),
},
{
description: "The reserved memory has limit type with zero value",
kubeletConfiguration: &kubeletconfig.KubeletConfiguration{
ReservedMemory: []kubeletconfig.MemoryReservation{
{
NumaNode: 0,
Limits: v1.ResourceList{
v1.ResourceMemory: *resource.NewQuantity(0, resource.DecimalSI),
},
},
},
},
expectedError: fmt.Errorf("reserved memory may not be zero for NUMA node %d and resource %q", 0, v1.ResourceMemory),
},
}
for _, testCase := range testCases {
errors := validateReservedMemoryConfiguration(testCase.kubeletConfiguration)
if len(errors) != 0 && testCase.expectedError == nil {
t.Errorf("expected errors %v, got %v", errors, testCase.expectedError)
}
if testCase.expectedError != nil {
if len(errors) == 0 {
t.Errorf("expected error %v, got %v", testCase.expectedError, errors)
}
if errors[0].Error() != testCase.expectedError.Error() {
t.Errorf("expected error %v, got %v", testCase.expectedError, errors[0])
}
}
}
}

View File

@ -21,6 +21,7 @@ limitations under the License.
package config
import (
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
)
@ -273,6 +274,13 @@ func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) {
out.Logging = in.Logging
out.ShutdownGracePeriod = in.ShutdownGracePeriod
out.ShutdownGracePeriodCriticalPods = in.ShutdownGracePeriodCriticalPods
if in.ReservedMemory != nil {
in, out := &in.ReservedMemory, &out.ReservedMemory
*out = make([]MemoryReservation, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
return
}
@ -345,6 +353,29 @@ func (in *KubeletX509Authentication) DeepCopy() *KubeletX509Authentication {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *MemoryReservation) DeepCopyInto(out *MemoryReservation) {
*out = *in
if in.Limits != nil {
in, out := &in.Limits, &out.Limits
*out = make(corev1.ResourceList, len(*in))
for key, val := range *in {
(*out)[key] = val.DeepCopy()
}
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MemoryReservation.
func (in *MemoryReservation) DeepCopy() *MemoryReservation {
if in == nil {
return nil
}
out := new(MemoryReservation)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *SerializedNodeConfigSource) DeepCopyInto(out *SerializedNodeConfigSource) {
*out = *in

View File

@ -31,8 +31,10 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/features:go_default_library",
"//pkg/kubelet/apis/config:go_default_library",
"//pkg/kubelet/cm/cpumanager:go_default_library",
"//pkg/kubelet/cm/cpuset:go_default_library",
"//pkg/kubelet/cm/memorymanager:go_default_library",
"//pkg/kubelet/cm/topologymanager:go_default_library",
"//pkg/kubelet/config:go_default_library",
"//pkg/kubelet/container:go_default_library",
@ -240,6 +242,7 @@ filegroup(
"//pkg/kubelet/cm/cpumanager:all-srcs",
"//pkg/kubelet/cm/cpuset:all-srcs",
"//pkg/kubelet/cm/devicemanager:all-srcs",
"//pkg/kubelet/cm/memorymanager:all-srcs",
"//pkg/kubelet/cm/topologymanager:all-srcs",
"//pkg/kubelet/cm/util:all-srcs",
],

View File

@ -17,6 +17,9 @@ limitations under the License.
package cm
import (
"fmt"
"strconv"
"strings"
"time"
"k8s.io/apimachinery/pkg/util/sets"
@ -24,6 +27,7 @@ import (
v1 "k8s.io/api/core/v1"
internalapi "k8s.io/cri-api/pkg/apis"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
@ -32,10 +36,6 @@ import (
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
"k8s.io/kubernetes/pkg/kubelet/status"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
"fmt"
"strconv"
"strings"
)
type ActivePodsFunc func() []*v1.Pod
@ -135,6 +135,8 @@ type NodeConfig struct {
ExperimentalCPUManagerPolicy string
ExperimentalTopologyManagerScope string
ExperimentalCPUManagerReconcilePeriod time.Duration
ExperimentalMemoryManagerPolicy string
ExperimentalMemoryManagerReservedMemory []kubeletconfig.MemoryReservation
ExperimentalPodPidsLimit int64
EnforceCPULimits bool
CPUCFSQuotaPeriod time.Duration

View File

@ -53,6 +53,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
cmutil "k8s.io/kubernetes/pkg/kubelet/cm/util"
"k8s.io/kubernetes/pkg/kubelet/config"
@ -138,6 +139,8 @@ type containerManagerImpl struct {
deviceManager devicemanager.Manager
// Interface for CPU affinity management.
cpuManager cpumanager.Manager
// Interface for memory affinity management.
memoryManager memorymanager.Manager
// Interface for Topology resource co-ordination
topologyManager topologymanager.Manager
}
@ -341,6 +344,22 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
cm.topologyManager.AddHintProvider(cm.cpuManager)
}
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.MemoryManager) {
cm.memoryManager, err = memorymanager.NewManager(
nodeConfig.ExperimentalMemoryManagerPolicy,
machineInfo,
cm.GetNodeAllocatableReservation(),
nodeConfig.ExperimentalMemoryManagerReservedMemory,
nodeConfig.KubeletRootDir,
cm.topologyManager,
)
if err != nil {
klog.Errorf("failed to initialize memory manager: %v", err)
return nil, err
}
cm.topologyManager.AddHintProvider(cm.memoryManager)
}
return cm, nil
}
@ -364,7 +383,7 @@ func (cm *containerManagerImpl) NewPodContainerManager() PodContainerManager {
}
func (cm *containerManagerImpl) InternalContainerLifecycle() InternalContainerLifecycle {
return &internalContainerLifecycleImpl{cm.cpuManager, cm.topologyManager}
return &internalContainerLifecycleImpl{cm.cpuManager, cm.memoryManager, cm.topologyManager}
}
// Create a cgroup container manager.
@ -606,6 +625,18 @@ func (cm *containerManagerImpl) Start(node *v1.Node,
}
}
// Initialize memory manager
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.MemoryManager) {
containerMap, err := buildContainerMapFromRuntime(runtimeService)
if err != nil {
return fmt.Errorf("failed to build map of initial containers from runtime: %v", err)
}
err = cm.memoryManager.Start(memorymanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap)
if err != nil {
return fmt.Errorf("start memory manager error: %v", err)
}
}
// cache the node Info including resource capacity and
// allocatable of the node
cm.nodeInfo = node
@ -706,11 +737,12 @@ func (cm *containerManagerImpl) GetAllocateResourcesPodAdmitHandler() lifecycle.
// work as we add more and more hint providers that the TopologyManager
// needs to call Allocate() on (that may not be directly intstantiated
// inside this component).
return &resourceAllocator{cm.cpuManager, cm.deviceManager}
return &resourceAllocator{cm.cpuManager, cm.memoryManager, cm.deviceManager}
}
type resourceAllocator struct {
cpuManager cpumanager.Manager
memoryManager memorymanager.Manager
deviceManager devicemanager.Manager
}
@ -737,6 +769,17 @@ func (m *resourceAllocator) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle
}
}
}
if m.memoryManager != nil {
err = m.memoryManager.Allocate(pod, &container)
if err != nil {
return lifecycle.PodAdmitResult{
Message: fmt.Sprintf("Allocate failed due to %v, which is unexpected", err),
Reason: "UnexpectedAdmissionError",
Admit: false,
}
}
}
}
return lifecycle.PodAdmitResult{Admit: true}

View File

@ -24,6 +24,7 @@ import (
internalapi "k8s.io/cri-api/pkg/apis"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
@ -102,7 +103,7 @@ func (cm *containerManagerStub) UpdatePluginResources(*schedulerframework.NodeIn
}
func (cm *containerManagerStub) InternalContainerLifecycle() InternalContainerLifecycle {
return &internalContainerLifecycleImpl{cpumanager.NewFakeManager(), topologymanager.NewFakeManager()}
return &internalContainerLifecycleImpl{cpumanager.NewFakeManager(), memorymanager.NewFakeManager(), topologymanager.NewFakeManager()}
}
func (cm *containerManagerStub) GetPodCgroupRoot() string {

View File

@ -37,6 +37,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
@ -208,7 +209,7 @@ func (cm *containerManagerImpl) UpdatePluginResources(node *schedulerframework.N
}
func (cm *containerManagerImpl) InternalContainerLifecycle() InternalContainerLifecycle {
return &internalContainerLifecycleImpl{cpumanager.NewFakeManager(), topologymanager.NewFakeManager()}
return &internalContainerLifecycleImpl{cpumanager.NewFakeManager(), memorymanager.NewFakeManager(), topologymanager.NewFakeManager()}
}
func (cm *containerManagerImpl) GetPodCgroupRoot() string {

View File

@ -25,6 +25,7 @@ import (
internalapi "k8s.io/cri-api/pkg/apis"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
@ -156,7 +157,7 @@ func (cm *FakeContainerManager) InternalContainerLifecycle() InternalContainerLi
cm.Lock()
defer cm.Unlock()
cm.CalledFunctions = append(cm.CalledFunctions, "InternalContainerLifecycle")
return &internalContainerLifecycleImpl{cpumanager.NewFakeManager(), topologymanager.NewFakeManager()}
return &internalContainerLifecycleImpl{cpumanager.NewFakeManager(), memorymanager.NewFakeManager(), topologymanager.NewFakeManager()}
}
func (cm *FakeContainerManager) GetPodCgroupRoot() string {

View File

@ -22,6 +22,7 @@ import (
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
)
@ -35,6 +36,7 @@ type InternalContainerLifecycle interface {
// Implements InternalContainerLifecycle interface.
type internalContainerLifecycleImpl struct {
cpuManager cpumanager.Manager
memoryManager memorymanager.Manager
topologyManager topologymanager.Manager
}
@ -43,6 +45,13 @@ func (i *internalContainerLifecycleImpl) PreStartContainer(pod *v1.Pod, containe
i.cpuManager.AddContainer(pod, container, containerID)
}
if i.memoryManager != nil {
err := i.memoryManager.AddContainer(pod, container, containerID)
if err != nil {
return err
}
}
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) {
err := i.topologyManager.AddContainer(pod, containerID)
if err != nil {

View File

@ -0,0 +1,73 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"fake_memory_manager.go",
"memory_manager.go",
"policy.go",
"policy_none.go",
"policy_static.go",
],
importpath = "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager",
visibility = ["//visibility:public"],
deps = [
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/apis/core/v1/helper/qos:go_default_library",
"//pkg/kubelet/apis/config:go_default_library",
"//pkg/kubelet/cm/containermap:go_default_library",
"//pkg/kubelet/cm/memorymanager/state:go_default_library",
"//pkg/kubelet/cm/topologymanager:go_default_library",
"//pkg/kubelet/cm/topologymanager/bitmask:go_default_library",
"//pkg/kubelet/config:go_default_library",
"//pkg/kubelet/status:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/cri-api/pkg/apis/runtime/v1alpha2:go_default_library",
"//vendor/github.com/google/cadvisor/info/v1:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = [
"memory_manager_test.go",
"policy_static_test.go",
],
embed = [":go_default_library"],
deps = [
"//pkg/kubelet/apis/config:go_default_library",
"//pkg/kubelet/cm/containermap:go_default_library",
"//pkg/kubelet/cm/memorymanager/state:go_default_library",
"//pkg/kubelet/cm/topologymanager:go_default_library",
"//pkg/kubelet/cm/topologymanager/bitmask:go_default_library",
"//pkg/kubelet/lifecycle:go_default_library",
"//pkg/kubelet/util/format:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/cri-api/pkg/apis/runtime/v1alpha2:go_default_library",
"//vendor/github.com/google/cadvisor/info/v1:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//pkg/kubelet/cm/memorymanager/state:all-srcs",
],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,77 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package memorymanager
import (
v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/status"
)
type fakeManager struct {
state state.State
}
func (m *fakeManager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error {
klog.Info("[fake memorymanager] Start()")
return nil
}
func (m *fakeManager) Policy() Policy {
klog.Info("[fake memorymanager] Policy()")
return NewPolicyNone()
}
func (m *fakeManager) Allocate(pod *v1.Pod, container *v1.Container) error {
klog.Infof("[fake memorymanager] Allocate (pod: %s, container: %s", pod.Name, container.Name)
return nil
}
func (m *fakeManager) AddContainer(pod *v1.Pod, container *v1.Container, containerID string) error {
klog.Infof("[fake memorymanager] AddContainer (pod: %s, container: %s, container id: %s)", pod.Name, container.Name, containerID)
return nil
}
func (m *fakeManager) RemoveContainer(containerID string) error {
klog.Infof("[fake memorymanager] RemoveContainer (container id: %s)", containerID)
return nil
}
func (m *fakeManager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
klog.Infof("[fake memorymanager] Get Topology Hints")
return map[string][]topologymanager.TopologyHint{}
}
func (m *fakeManager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint {
klog.Infof("[fake memorymanager] Get Pod Topology Hints")
return map[string][]topologymanager.TopologyHint{}
}
func (m *fakeManager) State() state.Reader {
return m.state
}
// NewFakeManager creates empty/fake memory manager
func NewFakeManager() Manager {
return &fakeManager{
state: state.NewMemoryState(),
}
}

View File

@ -0,0 +1,415 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package memorymanager
import (
"fmt"
"strconv"
"strings"
"sync"
cadvisorapi "github.com/google/cadvisor/info/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
"k8s.io/klog/v2"
corev1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/status"
)
// memoryManagerStateFileName is the file name where memory manager stores its state
const memoryManagerStateFileName = "memory_manager_state"
// ActivePodsFunc is a function that returns a list of active pods
type ActivePodsFunc func() []*v1.Pod
type runtimeService interface {
UpdateContainerResources(id string, resources *runtimeapi.LinuxContainerResources) error
}
type sourcesReadyStub struct{}
func (s *sourcesReadyStub) AddSource(source string) {}
func (s *sourcesReadyStub) AllReady() bool { return true }
// Manager interface provides methods for Kubelet to manage pod memory.
type Manager interface {
// Start is called during Kubelet initialization.
Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error
// AddContainer is called between container create and container start
// so that initial memory affinity settings can be written through to the
// container runtime before the first process begins to execute.
AddContainer(p *v1.Pod, c *v1.Container, containerID string) error
// Allocate is called to pre-allocate memory resources during Pod admission.
// This must be called at some point prior to the AddContainer() call for a container, e.g. at pod admission time.
Allocate(pod *v1.Pod, container *v1.Container) error
// RemoveContainer is called after Kubelet decides to kill or delete a
// container. After this call, any memory allocated to the container is freed.
RemoveContainer(containerID string) error
// State returns a read-only interface to the internal memory manager state.
State() state.Reader
// GetTopologyHints implements the topologymanager.HintProvider Interface
// and is consulted to achieve NUMA aware resource alignment among this
// and other resource controllers.
GetTopologyHints(*v1.Pod, *v1.Container) map[string][]topologymanager.TopologyHint
// GetPodTopologyHints implements the topologymanager.HintProvider Interface
// and is consulted to achieve NUMA aware resource alignment among this
// and other resource controllers.
GetPodTopologyHints(*v1.Pod) map[string][]topologymanager.TopologyHint
}
type manager struct {
sync.Mutex
policy Policy
// state allows to restore information regarding memory allocation for guaranteed pods
// in the case of the kubelet restart
state state.State
// containerRuntime is the container runtime service interface needed
// to make UpdateContainerResources() calls against the containers.
containerRuntime runtimeService
// activePods is a method for listing active pods on the node
// so all the containers can be updated during call to the removeStaleState.
activePods ActivePodsFunc
// podStatusProvider provides a method for obtaining pod statuses
// and the containerID of their containers
podStatusProvider status.PodStatusProvider
// containerMap provides a mapping from (pod, container) -> containerID
// for all containers a pod
containerMap containermap.ContainerMap
// sourcesReady provides the readiness of kubelet configuration sources such as apiserver update readiness.
// We use it to determine when we can purge inactive pods from checkpointed state.
sourcesReady config.SourcesReady
// stateFileDirectory holds the directory where the state file for checkpoints is held.
stateFileDirectory string
}
var _ Manager = &manager{}
// NewManager returns new instance of the memory manager
func NewManager(policyName string, machineInfo *cadvisorapi.MachineInfo, nodeAllocatableReservation v1.ResourceList, reservedMemory []kubeletconfig.MemoryReservation, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) {
var policy Policy
switch policyType(policyName) {
case policyTypeNone:
policy = NewPolicyNone()
case policyTypeStatic:
systemReserved, err := getSystemReservedMemory(machineInfo, nodeAllocatableReservation, reservedMemory)
if err != nil {
return nil, err
}
policy, err = NewPolicyStatic(machineInfo, systemReserved, affinity)
if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unknown policy: \"%s\"", policyName)
}
manager := &manager{
policy: policy,
stateFileDirectory: stateFileDirectory,
}
manager.sourcesReady = &sourcesReadyStub{}
return manager, nil
}
// Start starts the memory manager under the kubelet and calls policy start
func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error {
klog.Infof("[memorymanager] starting with %s policy", m.policy.Name())
m.sourcesReady = sourcesReady
m.activePods = activePods
m.podStatusProvider = podStatusProvider
m.containerRuntime = containerRuntime
m.containerMap = initialContainers
stateImpl, err := state.NewCheckpointState(m.stateFileDirectory, memoryManagerStateFileName, m.policy.Name())
if err != nil {
klog.Errorf("[memorymanager] could not initialize checkpoint manager: %v, please drain node and remove policy state file", err)
return err
}
m.state = stateImpl
err = m.policy.Start(m.state)
if err != nil {
klog.Errorf("[memorymanager] policy start error: %v", err)
return err
}
return nil
}
// AddContainer saves the value of requested memory for the guaranteed pod under the state and set memory affinity according to the topolgy manager
func (m *manager) AddContainer(pod *v1.Pod, container *v1.Container, containerID string) error {
m.Lock()
m.containerMap.Add(string(pod.UID), container.Name, containerID)
m.Unlock()
// Get NUMA node affinity of blocks assigned to the container during Allocate()
var nodes []string
for _, block := range m.state.GetMemoryBlocks(string(pod.UID), container.Name) {
for _, nodeID := range block.NUMAAffinity {
nodes = append(nodes, strconv.Itoa(nodeID))
}
}
if len(nodes) < 1 {
klog.V(5).Infof("[memorymanager] update container resources is skipped due to memory blocks are empty")
return nil
}
affinity := strings.Join(nodes, ",")
klog.Infof("[memorymanager] Set container %q cpuset.mems to %q", containerID, affinity)
err := m.containerRuntime.UpdateContainerResources(containerID, &runtimeapi.LinuxContainerResources{CpusetMems: affinity})
if err != nil {
klog.Errorf("[memorymanager] AddContainer error: error updating cpuset.mems for container (pod: %s, container: %s, container id: %s, err: %v)", pod.Name, container.Name, containerID, err)
m.Lock()
err = m.policyRemoveContainerByRef(string(pod.UID), container.Name)
if err != nil {
klog.Errorf("[memorymanager] AddContainer rollback state error: %v", err)
}
m.Unlock()
}
return err
}
// Allocate is called to pre-allocate memory resources during Pod admission.
func (m *manager) Allocate(pod *v1.Pod, container *v1.Container) error {
// Garbage collect any stranded resources before allocation
m.removeStaleState()
m.Lock()
defer m.Unlock()
// Call down into the policy to assign this container memory if required.
if err := m.policy.Allocate(m.state, pod, container); err != nil {
klog.Errorf("[memorymanager] Allocate error: %v", err)
return err
}
return nil
}
// RemoveContainer removes the container from the state
func (m *manager) RemoveContainer(containerID string) error {
m.Lock()
defer m.Unlock()
// if error appears it means container entry already does not exist under the container map
podUID, containerName, err := m.containerMap.GetContainerRef(containerID)
if err != nil {
klog.Warningf("[memorymanager] Failed to get container %s from container map error: %v", containerID, err)
return nil
}
err = m.policyRemoveContainerByRef(podUID, containerName)
if err != nil {
klog.Errorf("[memorymanager] RemoveContainer error: %v", err)
return err
}
return nil
}
// State returns the state of the manager
func (m *manager) State() state.Reader {
return m.state
}
// GetPodTopologyHints returns the topology hints for the topology manager
func (m *manager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint {
// Garbage collect any stranded resources before providing TopologyHints
m.removeStaleState()
// Delegate to active policy
return m.policy.GetPodTopologyHints(m.state, pod)
}
// GetTopologyHints returns the topology hints for the topology manager
func (m *manager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
// Garbage collect any stranded resources before providing TopologyHints
m.removeStaleState()
// Delegate to active policy
return m.policy.GetTopologyHints(m.state, pod, container)
}
// TODO: move the method to the upper level, to re-use it under the CPU and memory managers
func (m *manager) removeStaleState() {
// Only once all sources are ready do we attempt to remove any stale state.
// This ensures that the call to `m.activePods()` below will succeed with
// the actual active pods list.
if !m.sourcesReady.AllReady() {
return
}
// We grab the lock to ensure that no new containers will grab memory block while
// executing the code below. Without this lock, its possible that we end up
// removing state that is newly added by an asynchronous call to
// AddContainer() during the execution of this code.
m.Lock()
defer m.Unlock()
// Get the list of active pods.
activePods := m.activePods()
// Build a list of (podUID, containerName) pairs for all containers in all active Pods.
activeContainers := make(map[string]map[string]struct{})
for _, pod := range activePods {
activeContainers[string(pod.UID)] = make(map[string]struct{})
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
activeContainers[string(pod.UID)][container.Name] = struct{}{}
}
}
// Loop through the MemoryManager state. Remove any state for containers not
// in the `activeContainers` list built above.
assignments := m.state.GetMemoryAssignments()
for podUID := range assignments {
for containerName := range assignments[podUID] {
if _, ok := activeContainers[podUID][containerName]; !ok {
klog.Infof("[memorymanager] removeStaleState: removing (pod %s, container: %s)", podUID, containerName)
err := m.policyRemoveContainerByRef(podUID, containerName)
if err != nil {
klog.Errorf("[memorymanager] removeStaleState: failed to remove (pod %s, container %s), error: %v)", podUID, containerName, err)
}
}
}
}
}
func (m *manager) policyRemoveContainerByRef(podUID string, containerName string) error {
err := m.policy.RemoveContainer(m.state, podUID, containerName)
if err == nil {
m.containerMap.RemoveByContainerRef(podUID, containerName)
}
return err
}
func getTotalMemoryTypeReserved(machineInfo *cadvisorapi.MachineInfo, reservedMemory []kubeletconfig.MemoryReservation) (map[v1.ResourceName]resource.Quantity, error) {
totalMemoryType := map[v1.ResourceName]resource.Quantity{}
numaNodes := map[int]bool{}
for _, numaNode := range machineInfo.Topology {
numaNodes[numaNode.Id] = true
}
for _, reservation := range reservedMemory {
if !numaNodes[int(reservation.NumaNode)] {
return nil, fmt.Errorf("the reserved memory configuration references a NUMA node %d that does not exist on this machine", reservation.NumaNode)
}
for resourceName, q := range reservation.Limits {
if value, ok := totalMemoryType[resourceName]; ok {
q.Add(value)
}
totalMemoryType[resourceName] = q
}
}
return totalMemoryType, nil
}
func validateReservedMemory(machineInfo *cadvisorapi.MachineInfo, nodeAllocatableReservation v1.ResourceList, reservedMemory []kubeletconfig.MemoryReservation) error {
totalMemoryType, err := getTotalMemoryTypeReserved(machineInfo, reservedMemory)
if err != nil {
return err
}
commonMemoryTypeSet := make(map[v1.ResourceName]bool)
for resourceType := range totalMemoryType {
commonMemoryTypeSet[resourceType] = true
}
for resourceType := range nodeAllocatableReservation {
if !(corev1helper.IsHugePageResourceName(resourceType) || resourceType == v1.ResourceMemory) {
continue
}
commonMemoryTypeSet[resourceType] = true
}
for resourceType := range commonMemoryTypeSet {
nodeAllocatableMemory := resource.NewQuantity(0, resource.DecimalSI)
if memValue, set := nodeAllocatableReservation[resourceType]; set {
nodeAllocatableMemory.Add(memValue)
}
reservedMemory := resource.NewQuantity(0, resource.DecimalSI)
if memValue, set := totalMemoryType[resourceType]; set {
reservedMemory.Add(memValue)
}
if !(*nodeAllocatableMemory).Equal(*reservedMemory) {
return fmt.Errorf("the total amount %q of type %q is not equal to the value %q determined by Node Allocatable feature", reservedMemory.String(), resourceType, nodeAllocatableMemory.String())
}
}
return nil
}
func convertReserved(machineInfo *cadvisorapi.MachineInfo, reservedMemory []kubeletconfig.MemoryReservation) (systemReservedMemory, error) {
reservedMemoryConverted := make(map[int]map[v1.ResourceName]uint64)
for _, node := range machineInfo.Topology {
reservedMemoryConverted[node.Id] = make(map[v1.ResourceName]uint64)
}
for _, reservation := range reservedMemory {
for resourceName, q := range reservation.Limits {
val, success := q.AsInt64()
if !success {
return nil, fmt.Errorf("could not covert a variable of type Quantity to int64")
}
reservedMemoryConverted[int(reservation.NumaNode)][resourceName] = uint64(val)
}
}
return reservedMemoryConverted, nil
}
func getSystemReservedMemory(machineInfo *cadvisorapi.MachineInfo, nodeAllocatableReservation v1.ResourceList, reservedMemory []kubeletconfig.MemoryReservation) (systemReservedMemory, error) {
if err := validateReservedMemory(machineInfo, nodeAllocatableReservation, reservedMemory); err != nil {
return nil, err
}
reservedMemoryConverted, err := convertReserved(machineInfo, reservedMemory)
if err != nil {
return nil, err
}
return reservedMemoryConverted, nil
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,44 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package memorymanager
import (
v1 "k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
)
// Type defines the policy type
type policyType string
// Policy implements logic for pod container to a memory assignment.
type Policy interface {
Name() string
Start(s state.State) error
// Allocate call is idempotent
Allocate(s state.State, pod *v1.Pod, container *v1.Container) error
// RemoveContainer call is idempotent
RemoveContainer(s state.State, podUID string, containerName string) error
// GetTopologyHints implements the topologymanager.HintProvider Interface
// and is consulted to achieve NUMA aware resource alignment among this
// and other resource controllers.
GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint
// GetPodTopologyHints implements the topologymanager.HintProvider Interface
// and is consulted to achieve NUMA aware resource alignment among this
// and other resource controllers.
GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint
}

View File

@ -0,0 +1,68 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package memorymanager
import (
v1 "k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
)
const policyTypeNone policyType = "None"
// none is implementation of the policy interface for the none policy, using none
// policy is the same as disable memory management
type none struct{}
var _ Policy = &none{}
// NewPolicyNone returns new none policy instance
func NewPolicyNone() Policy {
return &none{}
}
func (p *none) Name() string {
return string(policyTypeNone)
}
func (p *none) Start(s state.State) error {
return nil
}
// Allocate call is idempotent
func (p *none) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error {
return nil
}
// RemoveContainer call is idempotent
func (p *none) RemoveContainer(s state.State, podUID string, containerName string) error {
return nil
}
// GetTopologyHints implements the topologymanager.HintProvider Interface
// and is consulted to achieve NUMA aware resource alignment among this
// and other resource controllers.
func (p *none) GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
return nil
}
// GetPodTopologyHints implements the topologymanager.HintProvider Interface
// and is consulted to achieve NUMA aware resource alignment among this
// and other resource controllers.
func (p *none) GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint {
return nil
}

View File

@ -0,0 +1,769 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package memorymanager
import (
"fmt"
"reflect"
"sort"
cadvisorapi "github.com/google/cadvisor/info/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/klog/v2"
corehelper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
)
const policyTypeStatic policyType = "Static"
type systemReservedMemory map[int]map[v1.ResourceName]uint64
// staticPolicy is implementation of the policy interface for the static policy
type staticPolicy struct {
// machineInfo contains machine memory related information
machineInfo *cadvisorapi.MachineInfo
// reserved contains memory that reserved for kube
systemReserved systemReservedMemory
// topology manager reference to get container Topology affinity
affinity topologymanager.Store
}
var _ Policy = &staticPolicy{}
// NewPolicyStatic returns new static policy instance
func NewPolicyStatic(machineInfo *cadvisorapi.MachineInfo, reserved systemReservedMemory, affinity topologymanager.Store) (Policy, error) {
var totalSystemReserved uint64
for _, node := range reserved {
if _, ok := node[v1.ResourceMemory]; !ok {
continue
}
totalSystemReserved += node[v1.ResourceMemory]
}
// check if we have some reserved memory for the system
if totalSystemReserved <= 0 {
return nil, fmt.Errorf("[memorymanager] you should specify the system reserved memory")
}
return &staticPolicy{
machineInfo: machineInfo,
systemReserved: reserved,
affinity: affinity,
}, nil
}
func (p *staticPolicy) Name() string {
return string(policyTypeStatic)
}
func (p *staticPolicy) Start(s state.State) error {
if err := p.validateState(s); err != nil {
klog.Errorf("[memorymanager] Invalid state: %v, please drain node and remove policy state file", err)
return err
}
return nil
}
// Allocate call is idempotent
func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error {
// allocate the memory only for guaranteed pods
if v1qos.GetPodQOS(pod) != v1.PodQOSGuaranteed {
return nil
}
klog.Infof("[memorymanager] Allocate (pod: %s, container: %s)", pod.Name, container.Name)
if blocks := s.GetMemoryBlocks(string(pod.UID), container.Name); blocks != nil {
klog.Infof("[memorymanager] Container already present in state, skipping (pod: %s, container: %s)", pod.Name, container.Name)
return nil
}
// Call Topology Manager to get the aligned affinity across all hint providers.
hint := p.affinity.GetAffinity(string(pod.UID), container.Name)
klog.Infof("[memorymanager] Pod %v, Container %v Topology Affinity is: %v", pod.UID, container.Name, hint)
requestedResources, err := getRequestedResources(container)
if err != nil {
return err
}
bestHint := &hint
// topology manager returned the hint with NUMA affinity nil
// we should use the default NUMA affinity calculated the same way as for the topology manager
if hint.NUMANodeAffinity == nil {
defaultHint, err := p.getDefaultHint(s, requestedResources)
if err != nil {
return err
}
if !defaultHint.Preferred && bestHint.Preferred {
return fmt.Errorf("[memorymanager] failed to find the default preferred hint")
}
bestHint = defaultHint
}
machineState := s.GetMachineState()
// topology manager returns the hint that does not satisfy completely the container request
// we should extend this hint to the one who will satisfy the request and include the current hint
if !isAffinitySatisfyRequest(machineState, bestHint.NUMANodeAffinity, requestedResources) {
extendedHint, err := p.extendTopologyManagerHint(s, requestedResources, bestHint.NUMANodeAffinity)
if err != nil {
return err
}
if !extendedHint.Preferred && bestHint.Preferred {
return fmt.Errorf("[memorymanager] failed to find the extended preferred hint")
}
bestHint = extendedHint
}
var containerBlocks []state.Block
maskBits := bestHint.NUMANodeAffinity.GetBits()
for resourceName, requestedSize := range requestedResources {
// update memory blocks
containerBlocks = append(containerBlocks, state.Block{
NUMAAffinity: maskBits,
Size: requestedSize,
Type: resourceName,
})
// Update nodes memory state
for _, nodeID := range maskBits {
machineState[nodeID].NumberOfAssignments++
machineState[nodeID].Cells = maskBits
// we need to continue to update all affinity mask nodes
if requestedSize == 0 {
continue
}
// update the node memory state
nodeResourceMemoryState := machineState[nodeID].MemoryMap[resourceName]
if nodeResourceMemoryState.Free <= 0 {
continue
}
// the node has enough memory to satisfy the request
if nodeResourceMemoryState.Free >= requestedSize {
nodeResourceMemoryState.Reserved += requestedSize
nodeResourceMemoryState.Free -= requestedSize
requestedSize = 0
continue
}
// the node does not have enough memory, use the node remaining memory and move to the next node
requestedSize -= nodeResourceMemoryState.Free
nodeResourceMemoryState.Reserved += nodeResourceMemoryState.Free
nodeResourceMemoryState.Free = 0
}
}
s.SetMachineState(machineState)
s.SetMemoryBlocks(string(pod.UID), container.Name, containerBlocks)
return nil
}
// RemoveContainer call is idempotent
func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerName string) error {
klog.Infof("[memorymanager] RemoveContainer (pod: %s, container: %s)", podUID, containerName)
blocks := s.GetMemoryBlocks(podUID, containerName)
if blocks == nil {
return nil
}
s.Delete(podUID, containerName)
// Mutate machine memory state to update free and reserved memory
machineState := s.GetMachineState()
for _, b := range blocks {
releasedSize := b.Size
for _, nodeID := range b.NUMAAffinity {
machineState[nodeID].NumberOfAssignments--
// once we do not have any memory allocations on this node, clear node groups
if machineState[nodeID].NumberOfAssignments == 0 {
machineState[nodeID].Cells = []int{nodeID}
}
// we still need to pass over all NUMA node under the affinity mask to update them
if releasedSize == 0 {
continue
}
nodeResourceMemoryState := machineState[nodeID].MemoryMap[b.Type]
// if the node does not have reserved memory to free, continue to the next node
if nodeResourceMemoryState.Reserved == 0 {
continue
}
// the reserved memory smaller than the amount of the memory that should be released
// release as much as possible and move to the next node
if nodeResourceMemoryState.Reserved < releasedSize {
releasedSize -= nodeResourceMemoryState.Reserved
nodeResourceMemoryState.Free += nodeResourceMemoryState.Reserved
nodeResourceMemoryState.Reserved = 0
continue
}
// the reserved memory big enough to satisfy the released memory
nodeResourceMemoryState.Free += releasedSize
nodeResourceMemoryState.Reserved -= releasedSize
releasedSize = 0
}
}
s.SetMachineState(machineState)
return nil
}
func regenerateHints(pod *v1.Pod, ctn *v1.Container, ctnBlocks []state.Block, reqRsrc map[v1.ResourceName]uint64) map[string][]topologymanager.TopologyHint {
hints := map[string][]topologymanager.TopologyHint{}
for resourceName := range reqRsrc {
hints[string(resourceName)] = []topologymanager.TopologyHint{}
}
if len(ctnBlocks) != len(reqRsrc) {
klog.Errorf("[memorymanager] The number of requested resources by the container %s differs from the number of memory blocks", ctn.Name)
return nil
}
for _, b := range ctnBlocks {
if _, ok := reqRsrc[b.Type]; !ok {
klog.Errorf("[memorymanager] Container %s requested resources do not have resource of type %s", ctn.Name, b.Type)
return nil
}
if b.Size != reqRsrc[b.Type] {
klog.Errorf("[memorymanager] Memory %s already allocated to (pod %v, container %v) with different number than request: requested: %d, allocated: %d", b.Type, pod.UID, ctn.Name, reqRsrc[b.Type], b.Size)
return nil
}
containerNUMAAffinity, err := bitmask.NewBitMask(b.NUMAAffinity...)
if err != nil {
klog.Errorf("[memorymanager] failed to generate NUMA bitmask: %v", err)
return nil
}
klog.Infof("[memorymanager] Regenerating TopologyHints, %s was already allocated to (pod %v, container %v)", b.Type, pod.UID, ctn.Name)
hints[string(b.Type)] = append(hints[string(b.Type)], topologymanager.TopologyHint{
NUMANodeAffinity: containerNUMAAffinity,
Preferred: true,
})
}
return hints
}
func getPodRequestedResources(pod *v1.Pod) (map[v1.ResourceName]uint64, error) {
reqRsrcsByInitCtrs := make(map[v1.ResourceName]uint64)
reqRsrcsByAppCtrs := make(map[v1.ResourceName]uint64)
for _, ctr := range pod.Spec.InitContainers {
reqRsrcs, err := getRequestedResources(&ctr)
if err != nil {
return nil, err
}
for rsrcName, qty := range reqRsrcs {
if _, ok := reqRsrcsByInitCtrs[rsrcName]; !ok {
reqRsrcsByInitCtrs[rsrcName] = uint64(0)
}
if reqRsrcs[rsrcName] > reqRsrcsByInitCtrs[rsrcName] {
reqRsrcsByInitCtrs[rsrcName] = qty
}
}
}
for _, ctr := range pod.Spec.Containers {
reqRsrcs, err := getRequestedResources(&ctr)
if err != nil {
return nil, err
}
for rsrcName, qty := range reqRsrcs {
if _, ok := reqRsrcsByAppCtrs[rsrcName]; !ok {
reqRsrcsByAppCtrs[rsrcName] = uint64(0)
}
reqRsrcsByAppCtrs[rsrcName] += qty
}
}
for rsrcName := range reqRsrcsByAppCtrs {
if reqRsrcsByInitCtrs[rsrcName] > reqRsrcsByAppCtrs[rsrcName] {
reqRsrcsByAppCtrs[rsrcName] = reqRsrcsByInitCtrs[rsrcName]
}
}
return reqRsrcsByAppCtrs, nil
}
func (p *staticPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint {
if v1qos.GetPodQOS(pod) != v1.PodQOSGuaranteed {
return nil
}
reqRsrcs, err := getPodRequestedResources(pod)
if err != nil {
klog.Error(err.Error())
return nil
}
for _, ctn := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
containerBlocks := s.GetMemoryBlocks(string(pod.UID), ctn.Name)
// Short circuit to regenerate the same hints if there are already
// memory allocated for the container. This might happen after a
// kubelet restart, for example.
if containerBlocks != nil {
return regenerateHints(pod, &ctn, containerBlocks, reqRsrcs)
}
}
return p.calculateHints(s, reqRsrcs)
}
// GetTopologyHints implements the topologymanager.HintProvider Interface
// and is consulted to achieve NUMA aware resource alignment among this
// and other resource controllers.
func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
if v1qos.GetPodQOS(pod) != v1.PodQOSGuaranteed {
return nil
}
requestedResources, err := getRequestedResources(container)
if err != nil {
klog.Error(err.Error())
return nil
}
containerBlocks := s.GetMemoryBlocks(string(pod.UID), container.Name)
// Short circuit to regenerate the same hints if there are already
// memory allocated for the container. This might happen after a
// kubelet restart, for example.
if containerBlocks != nil {
return regenerateHints(pod, container, containerBlocks, requestedResources)
}
return p.calculateHints(s, requestedResources)
}
func getRequestedResources(container *v1.Container) (map[v1.ResourceName]uint64, error) {
requestedResources := map[v1.ResourceName]uint64{}
for resourceName, quantity := range container.Resources.Requests {
if resourceName != v1.ResourceMemory && !corehelper.IsHugePageResourceName(resourceName) {
continue
}
requestedSize, succeed := quantity.AsInt64()
if !succeed {
return nil, fmt.Errorf("[memorymanager] failed to represent quantity as int64")
}
requestedResources[resourceName] = uint64(requestedSize)
}
return requestedResources, nil
}
func (p *staticPolicy) calculateHints(s state.State, requestedResources map[v1.ResourceName]uint64) map[string][]topologymanager.TopologyHint {
machineState := s.GetMachineState()
var numaNodes []int
for n := range machineState {
numaNodes = append(numaNodes, n)
}
sort.Ints(numaNodes)
// Initialize minAffinitySize to include all NUMA Cells.
minAffinitySize := len(numaNodes)
hints := map[string][]topologymanager.TopologyHint{}
bitmask.IterateBitMasks(numaNodes, func(mask bitmask.BitMask) {
maskBits := mask.GetBits()
singleNUMAHint := len(maskBits) == 1
// the node already in group with another node, it can not be used for the single NUMA node allocation
if singleNUMAHint && len(machineState[maskBits[0]].Cells) > 1 {
return
}
totalFreeSize := map[v1.ResourceName]uint64{}
totalAllocatableSize := map[v1.ResourceName]uint64{}
// calculate total free memory for the node mask
for _, nodeID := range maskBits {
// the node already used for the memory allocation
if !singleNUMAHint && machineState[nodeID].NumberOfAssignments > 0 {
// the node used for the single NUMA memory allocation, it can not be used for the multi NUMA node allocation
if len(machineState[nodeID].Cells) == 1 {
return
}
// the node already used with different group of nodes, it can not be use with in the current hint
if !areGroupsEqual(machineState[nodeID].Cells, maskBits) {
return
}
}
for resourceName := range requestedResources {
if _, ok := totalFreeSize[resourceName]; !ok {
totalFreeSize[resourceName] = 0
}
totalFreeSize[resourceName] += machineState[nodeID].MemoryMap[resourceName].Free
if _, ok := totalAllocatableSize[resourceName]; !ok {
totalAllocatableSize[resourceName] = 0
}
totalAllocatableSize[resourceName] += machineState[nodeID].MemoryMap[resourceName].Allocatable
}
}
// verify that for all memory types the node mask has enough allocatable resources
for resourceName, requestedSize := range requestedResources {
if totalAllocatableSize[resourceName] < requestedSize {
return
}
}
// set the minimum amount of NUMA nodes that can satisfy the container resources requests
if mask.Count() < minAffinitySize {
minAffinitySize = mask.Count()
}
// verify that for all memory types the node mask has enough free resources
for resourceName, requestedSize := range requestedResources {
if totalFreeSize[resourceName] < requestedSize {
return
}
}
// add the node mask as topology hint for all memory types
for resourceName := range requestedResources {
if _, ok := hints[string(resourceName)]; !ok {
hints[string(resourceName)] = []topologymanager.TopologyHint{}
}
hints[string(resourceName)] = append(hints[string(resourceName)], topologymanager.TopologyHint{
NUMANodeAffinity: mask,
Preferred: false,
})
}
})
// update hints preferred according to multiNUMAGroups, in case when it wasn't provided, the default
// behaviour to prefer the minimal amount of NUMA nodes will be used
for resourceName := range requestedResources {
for i, hint := range hints[string(resourceName)] {
hints[string(resourceName)][i].Preferred = p.isHintPreferred(hint.NUMANodeAffinity.GetBits(), minAffinitySize)
}
}
return hints
}
func (p *staticPolicy) isHintPreferred(maskBits []int, minAffinitySize int) bool {
return len(maskBits) == minAffinitySize
}
func areGroupsEqual(group1, group2 []int) bool {
sort.Ints(group1)
sort.Ints(group2)
if len(group1) != len(group2) {
return false
}
for i, elm := range group1 {
if group2[i] != elm {
return false
}
}
return true
}
func (p *staticPolicy) validateState(s state.State) error {
machineState := s.GetMachineState()
memoryAssignments := s.GetMemoryAssignments()
if len(machineState) == 0 {
// Machine state cannot be empty when assignments exist
if len(memoryAssignments) != 0 {
return fmt.Errorf("[memorymanager] machine state can not be empty when it has memory assignments")
}
defaultMachineState := p.getDefaultMachineState()
s.SetMachineState(defaultMachineState)
return nil
}
// calculate all memory assigned to containers
expectedMachineState := p.getDefaultMachineState()
for pod, container := range memoryAssignments {
for containerName, blocks := range container {
for _, b := range blocks {
requestedSize := b.Size
for _, nodeID := range b.NUMAAffinity {
nodeState, ok := expectedMachineState[nodeID]
if !ok {
return fmt.Errorf("[memorymanager] (pod: %s, container: %s) the memory assignment uses the NUMA that does not exist", pod, containerName)
}
nodeState.NumberOfAssignments++
nodeState.Cells = b.NUMAAffinity
memoryState, ok := nodeState.MemoryMap[b.Type]
if !ok {
return fmt.Errorf("[memorymanager] (pod: %s, container: %s) the memory assignment uses memory resource that does not exist", pod, containerName)
}
if requestedSize == 0 {
continue
}
// this node does not have enough memory continue to the next one
if memoryState.Free <= 0 {
continue
}
// the node has enough memory to satisfy the request
if memoryState.Free >= requestedSize {
memoryState.Reserved += requestedSize
memoryState.Free -= requestedSize
requestedSize = 0
continue
}
// the node does not have enough memory, use the node remaining memory and move to the next node
requestedSize -= memoryState.Free
memoryState.Reserved += memoryState.Free
memoryState.Free = 0
}
}
}
}
// State has already been initialized from file (is not empty)
// Validate that total size, system reserved and reserved memory not changed, it can happen, when:
// - adding or removing physical memory bank from the node
// - change of kubelet system-reserved, kube-reserved or pre-reserved-memory-zone parameters
if !areMachineStatesEqual(machineState, expectedMachineState) {
return fmt.Errorf("[memorymanager] the expected machine state is different from the real one")
}
return nil
}
func areMachineStatesEqual(ms1, ms2 state.NUMANodeMap) bool {
if len(ms1) != len(ms2) {
klog.Errorf("[memorymanager] node states are different len(ms1) != len(ms2): %d != %d", len(ms1), len(ms2))
return false
}
for nodeID, nodeState1 := range ms1 {
nodeState2, ok := ms2[nodeID]
if !ok {
klog.Errorf("[memorymanager] node state does not have node ID %d", nodeID)
return false
}
if nodeState1.NumberOfAssignments != nodeState2.NumberOfAssignments {
klog.Errorf("[memorymanager] node states number of assignments are different: %d != %d", nodeState1.NumberOfAssignments, nodeState2.NumberOfAssignments)
return false
}
if !areGroupsEqual(nodeState1.Cells, nodeState2.Cells) {
klog.Errorf("[memorymanager] node states groups are different: %v != %v", nodeState1.Cells, nodeState2.Cells)
return false
}
if len(nodeState1.MemoryMap) != len(nodeState2.MemoryMap) {
klog.Errorf("[memorymanager] node states memory map have different length: %d != %d", len(nodeState1.MemoryMap), len(nodeState2.MemoryMap))
return false
}
for resourceName, memoryState1 := range nodeState1.MemoryMap {
memoryState2, ok := nodeState2.MemoryMap[resourceName]
if !ok {
klog.Errorf("[memorymanager] memory state does not have resource %s", resourceName)
return false
}
if !reflect.DeepEqual(*memoryState1, *memoryState2) {
klog.Errorf("[memorymanager] memory states for the NUMA node %d and the resource %s are different: %+v != %+v", nodeID, resourceName, *memoryState1, *memoryState2)
return false
}
}
}
return true
}
func (p *staticPolicy) getDefaultMachineState() state.NUMANodeMap {
defaultMachineState := state.NUMANodeMap{}
nodeHugepages := map[int]uint64{}
for _, node := range p.machineInfo.Topology {
defaultMachineState[node.Id] = &state.NUMANodeState{
NumberOfAssignments: 0,
MemoryMap: map[v1.ResourceName]*state.MemoryTable{},
Cells: []int{node.Id},
}
// fill memory table with huge pages values
for _, hugepage := range node.HugePages {
hugepageQuantity := resource.NewQuantity(int64(hugepage.PageSize)*1024, resource.BinarySI)
resourceName := corehelper.HugePageResourceName(*hugepageQuantity)
systemReserved := p.getResourceSystemReserved(node.Id, resourceName)
totalHugepagesSize := hugepage.NumPages * hugepage.PageSize * 1024
allocatable := totalHugepagesSize - systemReserved
defaultMachineState[node.Id].MemoryMap[resourceName] = &state.MemoryTable{
Allocatable: allocatable,
Free: allocatable,
Reserved: 0,
SystemReserved: systemReserved,
TotalMemSize: totalHugepagesSize,
}
if _, ok := nodeHugepages[node.Id]; !ok {
nodeHugepages[node.Id] = 0
}
nodeHugepages[node.Id] += totalHugepagesSize
}
// fill memory table with regular memory values
systemReserved := p.getResourceSystemReserved(node.Id, v1.ResourceMemory)
allocatable := node.Memory - systemReserved
// remove memory allocated by hugepages
if allocatedByHugepages, ok := nodeHugepages[node.Id]; ok {
allocatable -= allocatedByHugepages
}
defaultMachineState[node.Id].MemoryMap[v1.ResourceMemory] = &state.MemoryTable{
Allocatable: allocatable,
Free: allocatable,
Reserved: 0,
SystemReserved: systemReserved,
TotalMemSize: node.Memory,
}
}
return defaultMachineState
}
func (p *staticPolicy) getResourceSystemReserved(nodeID int, resourceName v1.ResourceName) uint64 {
var systemReserved uint64
if nodeSystemReserved, ok := p.systemReserved[nodeID]; ok {
if nodeMemorySystemReserved, ok := nodeSystemReserved[resourceName]; ok {
systemReserved = nodeMemorySystemReserved
}
}
return systemReserved
}
func (p *staticPolicy) getDefaultHint(s state.State, requestedResources map[v1.ResourceName]uint64) (*topologymanager.TopologyHint, error) {
hints := p.calculateHints(s, requestedResources)
if len(hints) < 1 {
return nil, fmt.Errorf("[memorymanager] failed to get the default NUMA affinity, no NUMA nodes with enough memory is available")
}
// hints for all memory types should be the same, so we will check hints only for regular memory type
return findBestHint(hints[string(v1.ResourceMemory)]), nil
}
func isAffinitySatisfyRequest(machineState state.NUMANodeMap, mask bitmask.BitMask, requestedResources map[v1.ResourceName]uint64) bool {
totalFreeSize := map[v1.ResourceName]uint64{}
for _, nodeID := range mask.GetBits() {
for resourceName := range requestedResources {
if _, ok := totalFreeSize[resourceName]; !ok {
totalFreeSize[resourceName] = 0
}
totalFreeSize[resourceName] += machineState[nodeID].MemoryMap[resourceName].Free
}
}
// verify that for all memory types the node mask has enough resources
for resourceName, requestedSize := range requestedResources {
if totalFreeSize[resourceName] < requestedSize {
return false
}
}
return true
}
// extendTopologyManagerHint extends the topology manager hint, in case when it does not satisfy to the container request
// the topology manager uses bitwise AND to merge all topology hints into the best one, so in case of the restricted policy,
// it possible that we will get the subset of hint that we provided to the topology manager, in this case we want to extend
// it to the original one
func (p *staticPolicy) extendTopologyManagerHint(s state.State, requestedResources map[v1.ResourceName]uint64, mask bitmask.BitMask) (*topologymanager.TopologyHint, error) {
hints := p.calculateHints(s, requestedResources)
var filteredHints []topologymanager.TopologyHint
// hints for all memory types should be the same, so we will check hints only for regular memory type
for _, hint := range hints[string(v1.ResourceMemory)] {
affinityBits := hint.NUMANodeAffinity.GetBits()
// filter all hints that does not include currentHint
if isHintInGroup(mask.GetBits(), affinityBits) {
filteredHints = append(filteredHints, hint)
}
}
if len(filteredHints) < 1 {
return nil, fmt.Errorf("[memorymanager] failed to find NUMA nodes to extend the current topology hint")
}
// try to find the preferred hint with the minimal number of NUMA nodes, relevant for the restricted policy
return findBestHint(filteredHints), nil
}
func isHintInGroup(hint []int, group []int) bool {
sort.Ints(hint)
sort.Ints(group)
hintIndex := 0
for i := range group {
if hintIndex == len(hint) {
return true
}
if group[i] != hint[hintIndex] {
continue
}
hintIndex++
}
return false
}
func findBestHint(hints []topologymanager.TopologyHint) *topologymanager.TopologyHint {
// try to find the preferred hint with the minimal number of NUMA nodes, relevant for the restricted policy
bestHint := topologymanager.TopologyHint{}
for _, hint := range hints {
if bestHint.NUMANodeAffinity == nil {
bestHint = hint
continue
}
// preferred of the current hint is true, when the extendedHint preferred is false
if hint.Preferred && !bestHint.Preferred {
bestHint = hint
continue
}
// both hints has the same preferred value, but the current hint has less NUMA nodes than the extended one
if hint.Preferred == bestHint.Preferred && hint.NUMANodeAffinity.IsNarrowerThan(bestHint.NUMANodeAffinity) {
bestHint = hint
}
}
return &bestHint
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,46 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"checkpoint.go",
"state.go",
"state_checkpoint.go",
"state_mem.go",
],
importpath = "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state",
visibility = ["//visibility:public"],
deps = [
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/checkpointmanager/checksum:go_default_library",
"//pkg/kubelet/checkpointmanager/errors:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["state_checkpoint_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/cm/cpumanager/state/testing:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,65 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
"encoding/json"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
)
var _ checkpointmanager.Checkpoint = &MemoryManagerCheckpoint{}
// MemoryManagerCheckpoint struct is used to store memory/pod assignments in a checkpoint
type MemoryManagerCheckpoint struct {
PolicyName string `json:"policyName"`
MachineState NUMANodeMap `json:"machineState"`
Entries ContainerMemoryAssignments `json:"entries,omitempty"`
Checksum checksum.Checksum `json:"checksum"`
}
// NewMemoryManagerCheckpoint returns an instance of Checkpoint
func NewMemoryManagerCheckpoint() *MemoryManagerCheckpoint {
//lint:ignore unexported-type-in-api user-facing error message
return &MemoryManagerCheckpoint{
Entries: ContainerMemoryAssignments{},
MachineState: NUMANodeMap{},
}
}
// MarshalCheckpoint returns marshalled checkpoint
func (mp *MemoryManagerCheckpoint) MarshalCheckpoint() ([]byte, error) {
// make sure checksum wasn't set before so it doesn't affect output checksum
mp.Checksum = 0
mp.Checksum = checksum.New(mp)
return json.Marshal(*mp)
}
// UnmarshalCheckpoint tries to unmarshal passed bytes to checkpoint
func (mp *MemoryManagerCheckpoint) UnmarshalCheckpoint(blob []byte) error {
return json.Unmarshal(blob, mp)
}
// VerifyChecksum verifies that current checksum of checkpoint is valid
func (mp *MemoryManagerCheckpoint) VerifyChecksum() error {
ck := mp.Checksum
mp.Checksum = 0
err := ck.Verify(mp)
mp.Checksum = ck
return err
}

View File

@ -0,0 +1,130 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
v1 "k8s.io/api/core/v1"
)
// MemoryTable contains memory information
type MemoryTable struct {
TotalMemSize uint64 `json:"total"`
SystemReserved uint64 `json:"systemReserved"`
Allocatable uint64 `json:"allocatable"`
Reserved uint64 `json:"reserved"`
Free uint64 `json:"free"`
}
// NUMANodeState contains NUMA node related information
type NUMANodeState struct {
// NumberOfAssignments contains a number memory assignments from this node
// When the container requires memory and hugepages it will increase number of assignments by two
NumberOfAssignments int `json:"numberOfAssignments"`
// MemoryTable contains NUMA node memory related information
MemoryMap map[v1.ResourceName]*MemoryTable `json:"memoryMap"`
// Cells contains the current NUMA node and all other nodes that are in a group with current NUMA node
// This parameter indicates if the current node is used for the multiple NUMA node memory allocation
// For example if some container has pinning 0,1,2, NUMA nodes 0,1,2 under the state will have
// this parameter equals to [0, 1, 2]
Cells []int `json:"cells"`
}
// NUMANodeMap contains memory information for each NUMA node.
type NUMANodeMap map[int]*NUMANodeState
// Clone returns a copy of NUMANodeMap
func (nm NUMANodeMap) Clone() NUMANodeMap {
clone := make(NUMANodeMap)
for node, s := range nm {
if s == nil {
clone[node] = nil
continue
}
clone[node] = &NUMANodeState{}
clone[node].NumberOfAssignments = s.NumberOfAssignments
clone[node].Cells = append([]int{}, s.Cells...)
if s.MemoryMap == nil {
continue
}
clone[node].MemoryMap = map[v1.ResourceName]*MemoryTable{}
for memoryType, memoryTable := range s.MemoryMap {
clone[node].MemoryMap[memoryType] = &MemoryTable{
Allocatable: memoryTable.Allocatable,
Free: memoryTable.Free,
Reserved: memoryTable.Reserved,
SystemReserved: memoryTable.SystemReserved,
TotalMemSize: memoryTable.TotalMemSize,
}
}
}
return clone
}
// Block is a data structure used to represent a certain amount of memory
type Block struct {
// NUMAAffinity contains the string that represents NUMA affinity bitmask
NUMAAffinity []int `json:"numaAffinity"`
Type v1.ResourceName `json:"type"`
Size uint64 `json:"size"`
}
// ContainerMemoryAssignments stores memory assignments of containers
type ContainerMemoryAssignments map[string]map[string][]Block
// Clone returns a copy of ContainerMemoryAssignments
func (as ContainerMemoryAssignments) Clone() ContainerMemoryAssignments {
clone := make(ContainerMemoryAssignments)
for pod := range as {
clone[pod] = make(map[string][]Block)
for container, blocks := range as[pod] {
clone[pod][container] = append([]Block{}, blocks...)
}
}
return clone
}
// Reader interface used to read current memory/pod assignment state
type Reader interface {
// GetMachineState returns Memory Map stored in the State
GetMachineState() NUMANodeMap
// GetMemoryBlocks returns memory assignments of a container
GetMemoryBlocks(podUID string, containerName string) []Block
// GetMemoryAssignments returns ContainerMemoryAssignments
GetMemoryAssignments() ContainerMemoryAssignments
}
type writer interface {
// SetMachineState stores NUMANodeMap in State
SetMachineState(memoryMap NUMANodeMap)
// SetMemoryBlocks stores memory assignments of a container
SetMemoryBlocks(podUID string, containerName string, blocks []Block)
// SetMemoryAssignments sets ContainerMemoryAssignments by using the passed parameter
SetMemoryAssignments(assignments ContainerMemoryAssignments)
// Delete deletes corresponding Blocks from ContainerMemoryAssignments
Delete(podUID string, containerName string)
// ClearState clears machineState and ContainerMemoryAssignments
ClearState()
}
// State interface provides methods for tracking and setting memory/pod assignment
type State interface {
Reader
writer
}

View File

@ -0,0 +1,184 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
"fmt"
"path"
"sync"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
)
var _ State = &stateCheckpoint{}
type stateCheckpoint struct {
sync.RWMutex
cache State
policyName string
checkpointManager checkpointmanager.CheckpointManager
checkpointName string
}
// NewCheckpointState creates new State for keeping track of memory/pod assignment with checkpoint backend
func NewCheckpointState(stateDir, checkpointName, policyName string) (State, error) {
checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir)
if err != nil {
return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
}
stateCheckpoint := &stateCheckpoint{
cache: NewMemoryState(),
policyName: policyName,
checkpointManager: checkpointManager,
checkpointName: checkpointName,
}
if err := stateCheckpoint.restoreState(); err != nil {
//lint:ignore ST1005 user-facing error message
return nil, fmt.Errorf("could not restore state from checkpoint: %v, please drain this node and delete the memory manager checkpoint file %q before restarting Kubelet",
err, path.Join(stateDir, checkpointName))
}
return stateCheckpoint, nil
}
// restores state from a checkpoint and creates it if it doesn't exist
func (sc *stateCheckpoint) restoreState() error {
sc.Lock()
defer sc.Unlock()
var err error
checkpoint := NewMemoryManagerCheckpoint()
if err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpoint); err != nil {
if err == errors.ErrCheckpointNotFound {
return sc.storeState()
}
return err
}
if sc.policyName != checkpoint.PolicyName {
return fmt.Errorf("[memorymanager] configured policy %q differs from state checkpoint policy %q", sc.policyName, checkpoint.PolicyName)
}
sc.cache.SetMachineState(checkpoint.MachineState)
sc.cache.SetMemoryAssignments(checkpoint.Entries)
klog.V(2).Info("[memorymanager] state checkpoint: restored state from checkpoint")
return nil
}
// saves state to a checkpoint, caller is responsible for locking
func (sc *stateCheckpoint) storeState() error {
checkpoint := NewMemoryManagerCheckpoint()
checkpoint.PolicyName = sc.policyName
checkpoint.MachineState = sc.cache.GetMachineState()
checkpoint.Entries = sc.cache.GetMemoryAssignments()
err := sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint)
if err != nil {
klog.Errorf("[memorymanager] could not save checkpoint: %v", err)
return err
}
return nil
}
// GetMemoryState returns Memory Map stored in the State
func (sc *stateCheckpoint) GetMachineState() NUMANodeMap {
sc.RLock()
defer sc.RUnlock()
return sc.cache.GetMachineState()
}
// GetMemoryBlocks returns memory assignments of a container
func (sc *stateCheckpoint) GetMemoryBlocks(podUID string, containerName string) []Block {
sc.RLock()
defer sc.RUnlock()
return sc.cache.GetMemoryBlocks(podUID, containerName)
}
// GetMemoryAssignments returns ContainerMemoryAssignments
func (sc *stateCheckpoint) GetMemoryAssignments() ContainerMemoryAssignments {
sc.RLock()
defer sc.RUnlock()
return sc.cache.GetMemoryAssignments()
}
// SetMachineState stores NUMANodeMap in State
func (sc *stateCheckpoint) SetMachineState(memoryMap NUMANodeMap) {
sc.Lock()
defer sc.Unlock()
sc.cache.SetMachineState(memoryMap)
err := sc.storeState()
if err != nil {
klog.Warningf("store state to checkpoint error: %v", err)
}
}
// SetMemoryBlocks stores memory assignments of container
func (sc *stateCheckpoint) SetMemoryBlocks(podUID string, containerName string, blocks []Block) {
sc.Lock()
defer sc.Unlock()
sc.cache.SetMemoryBlocks(podUID, containerName, blocks)
err := sc.storeState()
if err != nil {
klog.Warningf("store state to checkpoint error: %v", err)
}
}
// SetMemoryAssignments sets ContainerMemoryAssignments by using the passed parameter
func (sc *stateCheckpoint) SetMemoryAssignments(assignments ContainerMemoryAssignments) {
sc.Lock()
defer sc.Unlock()
sc.cache.SetMemoryAssignments(assignments)
err := sc.storeState()
if err != nil {
klog.Warningf("store state to checkpoint error: %v", err)
}
}
// Delete deletes corresponding Blocks from ContainerMemoryAssignments
func (sc *stateCheckpoint) Delete(podUID string, containerName string) {
sc.Lock()
defer sc.Unlock()
sc.cache.Delete(podUID, containerName)
err := sc.storeState()
if err != nil {
klog.Warningf("store state to checkpoint error: %v", err)
}
}
// ClearState clears machineState and ContainerMemoryAssignments
func (sc *stateCheckpoint) ClearState() {
sc.Lock()
defer sc.Unlock()
sc.cache.ClearState()
err := sc.storeState()
if err != nil {
klog.Warningf("store state to checkpoint error: %v", err)
}
}

View File

@ -0,0 +1,384 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
"io/ioutil"
"os"
"strings"
"testing"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
testutil "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state/testing"
)
const testingCheckpoint = "memorymanager_checkpoint_test"
// assertStateEqual marks provided test as failed if provided states differ
func assertStateEqual(t *testing.T, restoredState, expectedState State) {
expectedMachineState := expectedState.GetMachineState()
restoredMachineState := restoredState.GetMachineState()
assert.Equal(t, expectedMachineState, restoredMachineState, "expected MachineState does not equal to restored one")
expectedMemoryAssignments := expectedState.GetMemoryAssignments()
restoredMemoryAssignments := restoredState.GetMemoryAssignments()
assert.Equal(t, expectedMemoryAssignments, restoredMemoryAssignments, "state memory assignments mismatch")
}
func TestCheckpointStateRestore(t *testing.T) {
testCases := []struct {
description string
checkpointContent string
expectedError string
expectedState *stateMemory
}{
{
"Restore non-existing checkpoint",
"",
"",
&stateMemory{},
},
{
"Restore valid checkpoint",
`{
"policyName":"static",
"machineState":{"0":{"numberOfAssignments":0,"memoryMap":{"memory":{"total":2048,"systemReserved":512,"allocatable":1536,"reserved":512,"free":1024}},"cells":[]}},
"entries":{"pod":{"container1":[{"numaAffinity":[0],"type":"memory","size":512}]}},
"checksum": 4215593881
}`,
"",
&stateMemory{
assignments: ContainerMemoryAssignments{
"pod": map[string][]Block{
"container1": {
{
NUMAAffinity: []int{0},
Type: v1.ResourceMemory,
Size: 512,
},
},
},
},
machineState: NUMANodeMap{
0: &NUMANodeState{
MemoryMap: map[v1.ResourceName]*MemoryTable{
v1.ResourceMemory: {
Allocatable: 1536,
Free: 1024,
Reserved: 512,
SystemReserved: 512,
TotalMemSize: 2048,
},
},
},
},
},
},
{
"Restore checkpoint with invalid checksum",
`{
"policyName":"static",
"machineState":{"0":{"numberOfAssignments":0,"memoryMap":{"memory":{"total":2048,"systemReserved":512,"allocatable":1536,"reserved":512,"free":1024}},"cells":[]}},
"entries":{"pod":{"container1":[{"affinity":[0],"type":"memory","size":512}]}},
"checksum": 101010
}`,
"checkpoint is corrupted",
&stateMemory{},
},
{
"Restore checkpoint with invalid JSON",
`{`,
"unexpected end of JSON input",
&stateMemory{},
},
}
// create temp dir
testingDir, err := ioutil.TempDir("", "memorymanager_state_test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(testingDir)
// create checkpoint manager for testing
cpm, err := checkpointmanager.NewCheckpointManager(testingDir)
assert.NoError(t, err, "could not create testing checkpoint manager")
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
// ensure there is no previous checkpoint
assert.NoError(t, cpm.RemoveCheckpoint(testingCheckpoint), "could not remove testing checkpoint")
// prepare checkpoint for testing
if strings.TrimSpace(tc.checkpointContent) != "" {
checkpoint := &testutil.MockCheckpoint{Content: tc.checkpointContent}
assert.NoError(t, cpm.CreateCheckpoint(testingCheckpoint, checkpoint), "could not create testing checkpoint")
}
restoredState, err := NewCheckpointState(testingDir, testingCheckpoint, "static")
if strings.TrimSpace(tc.expectedError) != "" {
assert.Error(t, err)
assert.Contains(t, err.Error(), "could not restore state from checkpoint: "+tc.expectedError)
} else {
assert.NoError(t, err, "unexpected error while creating checkpointState")
// compare state after restoration with the one expected
assertStateEqual(t, restoredState, tc.expectedState)
}
})
}
}
func TestCheckpointStateStore(t *testing.T) {
expectedState := &stateMemory{
assignments: ContainerMemoryAssignments{
"pod": map[string][]Block{
"container1": {
{
NUMAAffinity: []int{0},
Type: v1.ResourceMemory,
Size: 1024,
},
},
},
},
machineState: NUMANodeMap{
0: &NUMANodeState{
MemoryMap: map[v1.ResourceName]*MemoryTable{
v1.ResourceMemory: {
Allocatable: 1536,
Free: 512,
Reserved: 1024,
SystemReserved: 512,
TotalMemSize: 2048,
},
},
},
},
}
// create temp dir
testingDir, err := ioutil.TempDir("", "memorymanager_state_test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(testingDir)
cpm, err := checkpointmanager.NewCheckpointManager(testingDir)
assert.NoError(t, err, "could not create testing checkpoint manager")
assert.NoError(t, cpm.RemoveCheckpoint(testingCheckpoint), "could not remove testing checkpoint")
cs1, err := NewCheckpointState(testingDir, testingCheckpoint, "static")
assert.NoError(t, err, "could not create testing checkpointState instance")
// set values of cs1 instance so they are stored in checkpoint and can be read by cs2
cs1.SetMachineState(expectedState.machineState)
cs1.SetMemoryAssignments(expectedState.assignments)
// restore checkpoint with previously stored values
cs2, err := NewCheckpointState(testingDir, testingCheckpoint, "static")
assert.NoError(t, err, "could not create testing checkpointState instance")
assertStateEqual(t, cs2, expectedState)
}
func TestCheckpointStateHelpers(t *testing.T) {
testCases := []struct {
description string
machineState NUMANodeMap
assignments ContainerMemoryAssignments
}{
{
description: "One container",
assignments: ContainerMemoryAssignments{
"pod": map[string][]Block{
"container1": {
{
NUMAAffinity: []int{0},
Type: v1.ResourceMemory,
Size: 1024,
},
},
},
},
machineState: NUMANodeMap{
0: &NUMANodeState{
MemoryMap: map[v1.ResourceName]*MemoryTable{
v1.ResourceMemory: {
Allocatable: 1536,
Free: 512,
Reserved: 1024,
SystemReserved: 512,
TotalMemSize: 2048,
},
},
Cells: []int{},
},
},
},
{
description: "Two containers",
assignments: ContainerMemoryAssignments{
"pod": map[string][]Block{
"container1": {
{
NUMAAffinity: []int{0},
Type: v1.ResourceMemory,
Size: 512,
},
},
"container2": {
{
NUMAAffinity: []int{0},
Type: v1.ResourceMemory,
Size: 512,
},
},
},
},
machineState: NUMANodeMap{
0: &NUMANodeState{
MemoryMap: map[v1.ResourceName]*MemoryTable{
v1.ResourceMemory: {
Allocatable: 1536,
Free: 512,
Reserved: 1024,
SystemReserved: 512,
TotalMemSize: 2048,
},
},
Cells: []int{},
},
},
},
{
description: "Container without assigned memory",
assignments: ContainerMemoryAssignments{
"pod": map[string][]Block{
"container1": {},
},
},
machineState: NUMANodeMap{
0: &NUMANodeState{
MemoryMap: map[v1.ResourceName]*MemoryTable{
v1.ResourceMemory: {
Allocatable: 1536,
Free: 1536,
Reserved: 0,
SystemReserved: 512,
TotalMemSize: 2048,
},
},
Cells: []int{},
},
},
},
}
// create temp dir
testingDir, err := ioutil.TempDir("", "memorymanager_state_test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(testingDir)
cpm, err := checkpointmanager.NewCheckpointManager(testingDir)
assert.NoError(t, err, "could not create testing checkpoint manager")
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
// ensure there is no previous checkpoint
assert.NoError(t, cpm.RemoveCheckpoint(testingCheckpoint), "could not remove testing checkpoint")
state, err := NewCheckpointState(testingDir, testingCheckpoint, "static")
assert.NoError(t, err, "could not create testing checkpoint manager")
state.SetMachineState(tc.machineState)
assert.Equal(t, tc.machineState, state.GetMachineState(), "machine state inconsistent")
for pod := range tc.assignments {
for container, blocks := range tc.assignments[pod] {
state.SetMemoryBlocks(pod, container, blocks)
assert.Equal(t, blocks, state.GetMemoryBlocks(pod, container), "memory block inconsistent")
state.Delete(pod, container)
assert.Nil(t, state.GetMemoryBlocks(pod, container), "deleted container still existing in state")
}
}
})
}
}
func TestCheckpointStateClear(t *testing.T) {
testCases := []struct {
description string
machineState NUMANodeMap
assignments ContainerMemoryAssignments
}{
{
description: "Valid state cleaning",
assignments: ContainerMemoryAssignments{
"pod": map[string][]Block{
"container1": {
{
NUMAAffinity: []int{0},
Type: v1.ResourceMemory,
Size: 1024,
},
},
},
},
machineState: NUMANodeMap{
0: &NUMANodeState{
MemoryMap: map[v1.ResourceName]*MemoryTable{
v1.ResourceMemory: {
Allocatable: 1536,
Free: 512,
Reserved: 1024,
SystemReserved: 512,
TotalMemSize: 2048,
},
},
},
},
},
}
// create temp dir
testingDir, err := ioutil.TempDir("", "memorymanager_state_test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(testingDir)
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
state, err := NewCheckpointState(testingDir, testingCheckpoint, "static")
assert.NoError(t, err, "could not create testing checkpoint manager")
state.SetMachineState(tc.machineState)
state.SetMemoryAssignments(tc.assignments)
state.ClearState()
assert.Equal(t, NUMANodeMap{}, state.GetMachineState(), "cleared state with non-empty machine state")
assert.Equal(t, ContainerMemoryAssignments{}, state.GetMemoryAssignments(), "cleared state with non-empty memory assignments")
})
}
}

View File

@ -0,0 +1,123 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
"sync"
"k8s.io/klog/v2"
)
type stateMemory struct {
sync.RWMutex
assignments ContainerMemoryAssignments
machineState NUMANodeMap
}
var _ State = &stateMemory{}
// NewMemoryState creates new State for keeping track of cpu/pod assignment
func NewMemoryState() State {
klog.Infof("[memorymanager] initializing new in-memory state store")
return &stateMemory{
assignments: ContainerMemoryAssignments{},
machineState: NUMANodeMap{},
}
}
// GetMemoryState returns Memory Map stored in the State
func (s *stateMemory) GetMachineState() NUMANodeMap {
s.RLock()
defer s.RUnlock()
return s.machineState.Clone()
}
// GetMemoryBlocks returns memory assignments of a container
func (s *stateMemory) GetMemoryBlocks(podUID string, containerName string) []Block {
s.RLock()
defer s.RUnlock()
if res, ok := s.assignments[podUID][containerName]; ok {
return append([]Block{}, res...)
}
return nil
}
// GetMemoryAssignments returns ContainerMemoryAssignments
func (s *stateMemory) GetMemoryAssignments() ContainerMemoryAssignments {
s.RLock()
defer s.RUnlock()
return s.assignments.Clone()
}
// SetMachineState stores NUMANodeMap in State
func (s *stateMemory) SetMachineState(nodeMap NUMANodeMap) {
s.Lock()
defer s.Unlock()
s.machineState = nodeMap.Clone()
klog.Info("[memorymanager] updated machine memory state")
}
// SetMemoryBlocks stores memory assignments of container
func (s *stateMemory) SetMemoryBlocks(podUID string, containerName string, blocks []Block) {
s.Lock()
defer s.Unlock()
if _, ok := s.assignments[podUID]; !ok {
s.assignments[podUID] = map[string][]Block{}
}
s.assignments[podUID][containerName] = append([]Block{}, blocks...)
klog.Infof("[memorymanager] updated memory state (pod: %s, container: %s)", podUID, containerName)
}
// SetMemoryAssignments sets ContainerMemoryAssignments by using the passed parameter
func (s *stateMemory) SetMemoryAssignments(assignments ContainerMemoryAssignments) {
s.Lock()
defer s.Unlock()
s.assignments = assignments.Clone()
}
// Delete deletes corresponding Blocks from ContainerMemoryAssignments
func (s *stateMemory) Delete(podUID string, containerName string) {
s.Lock()
defer s.Unlock()
if _, ok := s.assignments[podUID]; !ok {
return
}
delete(s.assignments[podUID], containerName)
if len(s.assignments[podUID]) == 0 {
delete(s.assignments, podUID)
}
klog.V(2).Infof("[memorymanager] deleted memory assignment (pod: %s, container: %s)", podUID, containerName)
}
// ClearState clears machineState and ContainerMemoryAssignments
func (s *stateMemory) ClearState() {
s.Lock()
defer s.Unlock()
s.machineState = NUMANodeMap{}
s.assignments = make(ContainerMemoryAssignments)
klog.V(2).Infof("[memorymanager] cleared state")
}

View File

@ -11,6 +11,10 @@ go_library(
srcs = ["flags.go"],
importpath = "k8s.io/kubernetes/pkg/util/flag",
deps = [
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/kubelet/apis/config:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//vendor/github.com/spf13/pflag:go_default_library",
"//vendor/k8s.io/utils/net:go_default_library",
@ -34,5 +38,11 @@ go_test(
name = "go_default_test",
srcs = ["flags_test.go"],
embed = [":go_default_library"],
deps = ["//vendor/github.com/spf13/pflag:go_default_library"],
deps = [
"//pkg/kubelet/apis/config:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/github.com/spf13/pflag:go_default_library",
],
)

View File

@ -19,10 +19,17 @@ package flag
import (
"fmt"
"net"
"sort"
"strconv"
"strings"
"github.com/spf13/pflag"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
utilnet "k8s.io/apimachinery/pkg/util/net"
corev1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
utilsnet "k8s.io/utils/net"
)
@ -32,6 +39,7 @@ var (
_ pflag.Value = &IPVar{}
_ pflag.Value = &IPPortVar{}
_ pflag.Value = &PortRangeVar{}
_ pflag.Value = &ReservedMemoryVar{}
)
// IPVar is used for validating a command line option that represents an IP. It implements the pflag.Value interface
@ -151,3 +159,99 @@ func (v PortRangeVar) String() string {
func (v PortRangeVar) Type() string {
return "port-range"
}
// ReservedMemoryVar is used for validating a command line option that represents a reserved memory. It implements the pflag.Value interface
type ReservedMemoryVar struct {
Value *[]kubeletconfig.MemoryReservation
initialized bool // set to true after the first Set call
}
// Set sets the flag value
func (v *ReservedMemoryVar) Set(s string) error {
if v.Value == nil {
return fmt.Errorf("no target (nil pointer to *[]MemoryReservation")
}
if s == "" {
v.Value = nil
return nil
}
if !v.initialized || *v.Value == nil {
*v.Value = make([]kubeletconfig.MemoryReservation, 0)
v.initialized = true
}
if s == "" {
return nil
}
numaNodeReservation := strings.Split(s, ":")
if len(numaNodeReservation) != 2 {
return fmt.Errorf("the reserved memory has incorrect format, expected numaNodeID:type=quantity[,type=quantity...], got %s", s)
}
memoryTypeReservations := strings.Split(numaNodeReservation[1], ",")
if len(memoryTypeReservations) < 1 {
return fmt.Errorf("the reserved memory has incorrect format, expected numaNodeID:type=quantity[,type=quantity...], got %s", s)
}
numaNodeID, err := strconv.Atoi(numaNodeReservation[0])
if err != nil {
return fmt.Errorf("failed to convert the NUMA node ID, exptected integer, got %s", numaNodeReservation[0])
}
memoryReservation := kubeletconfig.MemoryReservation{
NumaNode: int32(numaNodeID),
Limits: map[v1.ResourceName]resource.Quantity{},
}
for _, reservation := range memoryTypeReservations {
limit := strings.Split(reservation, "=")
if len(limit) != 2 {
return fmt.Errorf("the reserved limit has incorrect value, expected type=quantatity, got %s", reservation)
}
resourceName := v1.ResourceName(limit[0])
if resourceName != v1.ResourceMemory && !corev1helper.IsHugePageResourceName(resourceName) {
return fmt.Errorf("memory type conversion error, unknown type: %q", resourceName)
}
q, err := resource.ParseQuantity(limit[1])
if err != nil {
return fmt.Errorf("failed to parse the quantatity, expected quantatity, got %s", limit[1])
}
memoryReservation.Limits[v1.ResourceName(limit[0])] = q
}
*v.Value = append(*v.Value, memoryReservation)
return nil
}
// String returns the flag value
func (v *ReservedMemoryVar) String() string {
if v == nil || v.Value == nil {
return ""
}
var slices []string
for _, reservedMemory := range *v.Value {
var limits []string
for resourceName, q := range reservedMemory.Limits {
limits = append(limits, fmt.Sprintf("%s=%s", resourceName, q.String()))
}
sort.Strings(limits)
slices = append(slices, fmt.Sprintf("%d:%s", reservedMemory.NumaNode, strings.Join(limits, ",")))
}
sort.Strings(slices)
return strings.Join(slices, ",")
}
// Type gets the flag type
func (v *ReservedMemoryVar) Type() string {
return "reserved-memory"
}

View File

@ -17,10 +17,16 @@ limitations under the License.
package flag
import (
"fmt"
"strings"
"testing"
"github.com/spf13/pflag"
v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/resource"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
)
func TestIPVar(t *testing.T) {
@ -163,3 +169,121 @@ func TestIPPortVar(t *testing.T) {
}
}
}
func TestReservedMemoryVar(t *testing.T) {
resourceNameHugepages1Gi := v1.ResourceName(fmt.Sprintf("%s1Gi", v1.ResourceHugePagesPrefix))
memory1Gi := resource.MustParse("1Gi")
testCases := []struct {
desc string
argc string
expectErr bool
expectVal []kubeletconfig.MemoryReservation
}{
{
desc: "valid input",
argc: "blah --reserved-memory=0:memory=1Gi",
expectVal: []kubeletconfig.MemoryReservation{
{
NumaNode: 0,
Limits: v1.ResourceList{
v1.ResourceMemory: memory1Gi,
},
},
},
},
{
desc: "valid input with multiple memory types",
argc: "blah --reserved-memory=0:memory=1Gi,hugepages-1Gi=1Gi",
expectVal: []kubeletconfig.MemoryReservation{
{
NumaNode: 0,
Limits: v1.ResourceList{
v1.ResourceMemory: memory1Gi,
resourceNameHugepages1Gi: memory1Gi,
},
},
},
},
{
desc: "valid input with multiple reserved-memory arguments",
argc: "blah --reserved-memory=0:memory=1Gi,hugepages-1Gi=1Gi --reserved-memory=1:memory=1Gi",
expectVal: []kubeletconfig.MemoryReservation{
{
NumaNode: 0,
Limits: v1.ResourceList{
v1.ResourceMemory: memory1Gi,
resourceNameHugepages1Gi: memory1Gi,
},
},
{
NumaNode: 1,
Limits: v1.ResourceList{
v1.ResourceMemory: memory1Gi,
},
},
},
},
{
desc: "invalid input",
argc: "blah --reserved-memory=bad-input",
expectVal: nil,
expectErr: true,
},
{
desc: "invalid input without memory types",
argc: "blah --reserved-memory=0:",
expectVal: nil,
expectErr: true,
},
{
desc: "invalid input with non-integer NUMA node",
argc: "blah --reserved-memory=a:memory=1Gi",
expectVal: nil,
expectErr: true,
},
{
desc: "invalid input with invalid limit",
argc: "blah --reserved-memory=0:memory=",
expectVal: nil,
expectErr: true,
},
{
desc: "invalid input with invalid memory type",
argc: "blah --reserved-memory=0:type=1Gi",
expectVal: nil,
expectErr: true,
},
{
desc: "invalid input with invalid quantity",
argc: "blah --reserved-memory=0:memory=1Be",
expectVal: nil,
expectErr: true,
},
}
for _, tc := range testCases {
fs := pflag.NewFlagSet("blah", pflag.PanicOnError)
var reservedMemory []kubeletconfig.MemoryReservation
fs.Var(&ReservedMemoryVar{Value: &reservedMemory}, "reserved-memory", "--reserved-memory 0:memory=1Gi,hugepages-1M=2Gi")
var err error
func() {
defer func() {
if r := recover(); r != nil {
err = r.(error)
}
}()
fs.Parse(strings.Split(tc.argc, " "))
}()
if tc.expectErr && err == nil {
t.Fatalf("%q: Did not observe an expected error", tc.desc)
}
if !tc.expectErr && err != nil {
t.Fatalf("%q: Observed an unexpected error: %v", tc.desc, err)
}
if !apiequality.Semantic.DeepEqual(reservedMemory, tc.expectVal) {
t.Fatalf("%q: Unexpected reserved-error: expected %v, saw %v", tc.desc, tc.expectVal, reservedMemory)
}
}
}

View File

@ -73,6 +73,13 @@ const (
// PodTopologyManagerScope represents that
// topology policy is applied on a per-pod basis.
PodTopologyManagerScope = "pod"
// NoneMemoryManagerPolicy is a memory manager none policy, under the none policy
// the memory manager will not pin containers memory of guaranteed pods
NoneMemoryManagerPolicy = "None"
// StaticMemoryManagerPolicy is a memory manager static policy, under the static policy
// the memory manager will try to pin containers memory of guaranteed pods to the smallest
// possible sub-set of NUMA nodes
StaticMemoryManagerPolicy = "Static"
)
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
@ -423,7 +430,7 @@ type KubeletConfiguration struct {
// Requires the CPUManager feature gate to be enabled.
// Dynamic Kubelet Config (beta): This field should not be updated without a full node
// reboot. It is safest to keep this value the same as the local config.
// Default: "none"
// Default: "None"
// +optional
CPUManagerPolicy string `json:"cpuManagerPolicy,omitempty"`
// CPU Manager reconciliation period.
@ -433,6 +440,13 @@ type KubeletConfiguration struct {
// Default: "10s"
// +optional
CPUManagerReconcilePeriod metav1.Duration `json:"cpuManagerReconcilePeriod,omitempty"`
// MemoryManagerPolicy is the name of the policy to use by memory manager.
// Requires the MemoryManager feature gate to be enabled.
// Dynamic Kubelet Config (beta): This field should not be updated without a full node
// reboot. It is safest to keep this value the same as the local config.
// Default: "none"
// +optional
MemoryManagerPolicy string `json:"memoryManagerPolicy,omitempty"`
// TopologyManagerPolicy is the name of the policy to use.
// Policies other than "none" require the TopologyManager feature gate to be enabled.
// Dynamic Kubelet Config (beta): This field should not be updated without a full node
@ -824,6 +838,22 @@ type KubeletConfiguration struct {
// Default: "10s"
// +optional
ShutdownGracePeriodCriticalPods metav1.Duration `json:"shutdownGracePeriodCriticalPods,omitempty"`
// ReservedMemory specifies a comma-separated list of memory reservations for NUMA nodes.
// The parameter makes sense only in the context of the memory manager feature. The memory manager will not allocate reserved memory for container workloads.
// For example, if you have a NUMA0 with 10Gi of memory and the ReservedMemory was specified to reserve 1Gi of memory at NUMA0,
// the memory manager will assume that only 9Gi is available for allocation.
// You can specify a different amount of NUMA node and memory types.
// You can omit this parameter at all, but you should be aware that the amount of reserved memory from all NUMA nodes
// should be equal to the amount of memory specified by the node allocatable features(https://kubernetes.io/docs/tasks/administer-cluster/reserve-compute-resources/#node-allocatable).
// If at least one node allocatable parameter has a non-zero value, you will need to specify at least one NUMA node.
// Also, avoid specifying:
// 1. Duplicates, the same NUMA node, and memory type, but with a different value.
// 2. zero limits for any memory type.
// 3. NUMAs nodes IDs that do not exist under the machine.
// 4. memory types except for memory and hugepages-<size>
// Default: nil
// +optional
ReservedMemory []MemoryReservation `json:"reservedMemory,omitempty"`
}
type KubeletAuthorizationMode string
@ -904,3 +934,9 @@ type SerializedNodeConfigSource struct {
// +optional
Source v1.NodeConfigSource `json:"source,omitempty" protobuf:"bytes,1,opt,name=source"`
}
// MemoryReservation specifies the memory reservation of different types for each NUMA node
type MemoryReservation struct {
NumaNode int32 `json:"numaNode"`
Limits v1.ResourceList `json:"limits"`
}

View File

@ -21,6 +21,7 @@ limitations under the License.
package v1beta1
import (
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
)
@ -303,6 +304,13 @@ func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) {
}
out.ShutdownGracePeriod = in.ShutdownGracePeriod
out.ShutdownGracePeriodCriticalPods = in.ShutdownGracePeriodCriticalPods
if in.ReservedMemory != nil {
in, out := &in.ReservedMemory, &out.ReservedMemory
*out = make([]MemoryReservation, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
return
}
@ -380,6 +388,29 @@ func (in *KubeletX509Authentication) DeepCopy() *KubeletX509Authentication {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *MemoryReservation) DeepCopyInto(out *MemoryReservation) {
*out = *in
if in.Limits != nil {
in, out := &in.Limits, &out.Limits
*out = make(corev1.ResourceList, len(*in))
for key, val := range *in {
(*out)[key] = val.DeepCopy()
}
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MemoryReservation.
func (in *MemoryReservation) DeepCopy() *MemoryReservation {
if in == nil {
return nil
}
out := new(MemoryReservation)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *SerializedNodeConfigSource) DeepCopyInto(out *SerializedNodeConfigSource) {
*out = *in

View File

@ -128,6 +128,7 @@ go_test(
"hugepages_test.go",
"image_id_test.go",
"log_path_test.go",
"memory_manager_test.go",
"mirror_pod_grace_period_test.go",
"mirror_pod_test.go",
"node_container_manager_test.go",
@ -160,6 +161,7 @@ go_test(
"//pkg/kubelet/cm/cpumanager:go_default_library",
"//pkg/kubelet/cm/cpumanager/state:go_default_library",
"//pkg/kubelet/cm/cpuset:go_default_library",
"//pkg/kubelet/cm/memorymanager/state:go_default_library",
"//pkg/kubelet/cm/topologymanager:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/events:go_default_library",
@ -226,6 +228,7 @@ go_test(
"//vendor/github.com/onsi/gomega/types:go_default_library",
"//vendor/github.com/prometheus/common/model:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library",
] + select({
"@io_bazel_rules_go//go/platform:android": [
"//pkg/kubelet/stats/pidlimit:go_default_library",

View File

@ -0,0 +1,567 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2enode
import (
"context"
"encoding/json"
"fmt"
"os"
"os/exec"
"regexp"
"sort"
"strconv"
"strings"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state"
"k8s.io/kubernetes/test/e2e/framework"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
"k8s.io/utils/pointer"
"github.com/onsi/ginkgo"
"github.com/onsi/gomega"
)
const (
evictionHardMemory = "memory.available"
memoryManagerStateFile = "/var/lib/kubelet/memory_manager_state"
resourceMemory = "memory"
staticPolicy = "Static"
nonePolicy = "None"
)
// Helper for makeMemoryManagerPod().
type memoryManagerCtnAttributes struct {
ctnName string
cpus string
memory string
hugepages2Mi string
}
// makeMemoryManagerContainers returns slice of containers with provided attributes and indicator of hugepages mount needed for those.
func makeMemoryManagerContainers(ctnCmd string, ctnAttributes []memoryManagerCtnAttributes) ([]v1.Container, bool) {
hugepagesMount := false
var containers []v1.Container
for _, ctnAttr := range ctnAttributes {
ctn := v1.Container{
Name: ctnAttr.ctnName,
Image: busyboxImage,
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse(ctnAttr.cpus),
v1.ResourceMemory: resource.MustParse(ctnAttr.memory),
},
},
Command: []string{"sh", "-c", ctnCmd},
}
if ctnAttr.hugepages2Mi != "" {
hugepagesMount = true
ctn.Resources.Limits[hugepagesResourceName2Mi] = resource.MustParse(ctnAttr.hugepages2Mi)
ctn.VolumeMounts = []v1.VolumeMount{
{
Name: "hugepages-2mi",
MountPath: "/hugepages-2Mi",
},
}
}
containers = append(containers, ctn)
}
return containers, hugepagesMount
}
// makeMemoryMangerPod returns a pod with the provided ctnAttributes.
func makeMemoryManagerPod(podName string, initCtnAttributes, ctnAttributes []memoryManagerCtnAttributes) *v1.Pod {
hugepagesMount := false
memsetCmd := "grep Mems_allowed_list /proc/self/status | cut -f2"
memsetSleepCmd := memsetCmd + "&& sleep 1d"
var containers, initContainers []v1.Container
if len(initCtnAttributes) > 0 {
initContainers, _ = makeMemoryManagerContainers(memsetCmd, initCtnAttributes)
}
containers, hugepagesMount = makeMemoryManagerContainers(memsetSleepCmd, ctnAttributes)
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
GenerateName: podName,
},
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyNever,
Containers: containers,
InitContainers: initContainers,
},
}
if hugepagesMount {
pod.Spec.Volumes = []v1.Volume{
{
Name: "hugepages-2mi",
VolumeSource: v1.VolumeSource{
EmptyDir: &v1.EmptyDirVolumeSource{
Medium: mediumHugepages2Mi,
},
},
},
}
}
return pod
}
func deleteMemoryManagerStateFile() {
err := exec.Command("/bin/sh", "-c", fmt.Sprintf("rm -f %s", memoryManagerStateFile)).Run()
framework.ExpectNoError(err, "failed to delete the state file")
}
func getMemoryManagerState() (*state.MemoryManagerCheckpoint, error) {
if _, err := os.Stat(memoryManagerStateFile); os.IsNotExist(err) {
return nil, fmt.Errorf("the memory manager state file %s does not exist", memoryManagerStateFile)
}
out, err := exec.Command("/bin/sh", "-c", fmt.Sprintf("cat %s", memoryManagerStateFile)).Output()
if err != nil {
return nil, fmt.Errorf("failed to run command 'cat %s': out: %s, err: %v", memoryManagerStateFile, out, err)
}
memoryManagerCheckpoint := &state.MemoryManagerCheckpoint{}
if err := json.Unmarshal(out, memoryManagerCheckpoint); err != nil {
return nil, fmt.Errorf("failed to unmarshal memory manager state file: %v", err)
}
return memoryManagerCheckpoint, nil
}
type kubeletParams struct {
memoryManagerFeatureGate bool
memoryManagerPolicy string
systemReservedMemory []kubeletconfig.MemoryReservation
systemReserved map[string]string
kubeReserved map[string]string
evictionHard map[string]string
}
func getUpdatedKubeletConfig(oldCfg *kubeletconfig.KubeletConfiguration, params *kubeletParams) *kubeletconfig.KubeletConfiguration {
newCfg := oldCfg.DeepCopy()
if newCfg.FeatureGates == nil {
newCfg.FeatureGates = map[string]bool{}
}
newCfg.FeatureGates["MemoryManager"] = params.memoryManagerFeatureGate
newCfg.MemoryManagerPolicy = params.memoryManagerPolicy
// update system-reserved
if newCfg.SystemReserved == nil {
newCfg.SystemReserved = map[string]string{}
}
for resourceName, value := range params.systemReserved {
newCfg.SystemReserved[resourceName] = value
}
// update kube-reserved
if newCfg.KubeReserved == nil {
newCfg.KubeReserved = map[string]string{}
}
for resourceName, value := range params.kubeReserved {
newCfg.KubeReserved[resourceName] = value
}
// update hard eviction threshold
if newCfg.EvictionHard == nil {
newCfg.EvictionHard = map[string]string{}
}
for resourceName, value := range params.evictionHard {
newCfg.EvictionHard[resourceName] = value
}
// update reserved memory
if newCfg.ReservedMemory == nil {
newCfg.ReservedMemory = []kubeletconfig.MemoryReservation{}
}
for _, memoryReservation := range params.systemReservedMemory {
newCfg.ReservedMemory = append(newCfg.ReservedMemory, memoryReservation)
}
return newCfg
}
func updateKubeletConfig(f *framework.Framework, cfg *kubeletconfig.KubeletConfiguration) {
// remove the state file
deleteMemoryManagerStateFile()
// Update the Kubelet configuration
framework.ExpectNoError(setKubeletConfiguration(f, cfg))
// Wait for the Kubelet to be ready.
gomega.Eventually(func() bool {
nodes, err := e2enode.TotalReady(f.ClientSet)
framework.ExpectNoError(err)
return nodes == 1
}, time.Minute, time.Second).Should(gomega.BeTrue())
}
func getAllNUMANodes() []int {
outData, err := exec.Command("/bin/sh", "-c", "lscpu").Output()
framework.ExpectNoError(err)
numaNodeRegex, err := regexp.Compile(`NUMA node(\d+) CPU\(s\):`)
framework.ExpectNoError(err)
matches := numaNodeRegex.FindAllSubmatch(outData, -1)
var numaNodes []int
for _, m := range matches {
n, err := strconv.Atoi(string(m[1]))
framework.ExpectNoError(err)
numaNodes = append(numaNodes, n)
}
sort.Ints(numaNodes)
return numaNodes
}
// Serial because the test updates kubelet configuration.
var _ = SIGDescribe("Memory Manager [Serial] [Feature:MemoryManager][NodeAlphaFeature:MemoryManager]", func() {
// TODO: add more complex tests that will include interaction between CPUManager, MemoryManager and TopologyManager
var (
allNUMANodes []int
ctnParams, initCtnParams []memoryManagerCtnAttributes
is2MiHugepagesSupported *bool
isMultiNUMASupported *bool
kubeParams *kubeletParams
oldCfg *kubeletconfig.KubeletConfiguration
testPod *v1.Pod
)
f := framework.NewDefaultFramework("memory-manager-test")
memoryQuantatity := resource.MustParse("1100Mi")
defaultKubeParams := &kubeletParams{
memoryManagerFeatureGate: true,
systemReservedMemory: []kubeletconfig.MemoryReservation{
{
NumaNode: 0,
Limits: v1.ResourceList{
resourceMemory: memoryQuantatity,
},
},
},
systemReserved: map[string]string{resourceMemory: "500Mi"},
kubeReserved: map[string]string{resourceMemory: "500Mi"},
evictionHard: map[string]string{evictionHardMemory: "100Mi"},
}
verifyMemoryPinning := func(pod *v1.Pod, numaNodeIDs []int) {
ginkgo.By("Verifying the NUMA pinning")
output, err := e2epod.GetPodLogs(f.ClientSet, f.Namespace.Name, pod.Name, pod.Spec.Containers[0].Name)
framework.ExpectNoError(err)
currentNUMANodeIDs, err := cpuset.Parse(strings.Trim(output, "\n"))
framework.ExpectNoError(err)
framework.ExpectEqual(numaNodeIDs, currentNUMANodeIDs.ToSlice())
}
ginkgo.BeforeEach(func() {
if isMultiNUMASupported == nil {
isMultiNUMASupported = pointer.BoolPtr(isMultiNUMA())
}
if is2MiHugepagesSupported == nil {
is2MiHugepagesSupported = pointer.BoolPtr(isHugePageAvailable(hugepagesSize2M))
}
if len(allNUMANodes) == 0 {
allNUMANodes = getAllNUMANodes()
}
})
// dynamically update the kubelet configuration
ginkgo.JustBeforeEach(func() {
var err error
// allocate hugepages
if *is2MiHugepagesSupported {
err := configureHugePages(hugepagesSize2M, 256)
framework.ExpectNoError(err)
}
// get the old kubelet config
oldCfg, err = getCurrentKubeletConfig()
framework.ExpectNoError(err)
// update the kubelet config with new parameters
newCfg := getUpdatedKubeletConfig(oldCfg, kubeParams)
updateKubeletConfig(f, newCfg)
// request hugepages resources under the container
if *is2MiHugepagesSupported {
for i := 0; i < len(ctnParams); i++ {
ctnParams[i].hugepages2Mi = "128Mi"
}
}
testPod = makeMemoryManagerPod(ctnParams[0].ctnName, initCtnParams, ctnParams)
})
ginkgo.JustAfterEach(func() {
// delete the test pod
if testPod.Name != "" {
f.PodClient().DeleteSync(testPod.Name, metav1.DeleteOptions{}, 2*time.Minute)
}
// release hugepages
gomega.Eventually(func() error {
return configureHugePages(hugepagesSize2M, 0)
}, 90*time.Second, 15*time.Second).ShouldNot(gomega.HaveOccurred(), "failed to release hugepages")
// update the kubelet config with old values
updateKubeletConfig(f, oldCfg)
})
ginkgo.Context("with static policy", func() {
ginkgo.BeforeEach(func() {
// override kubelet configuration parameters
tmpParams := *defaultKubeParams
tmpParams.memoryManagerPolicy = staticPolicy
kubeParams = &tmpParams
})
ginkgo.JustAfterEach(func() {
// reset containers attributes
ctnParams = []memoryManagerCtnAttributes{}
initCtnParams = []memoryManagerCtnAttributes{}
})
ginkgo.When("guaranteed pod has init and app containers", func() {
ginkgo.BeforeEach(func() {
// override containers parameters
ctnParams = []memoryManagerCtnAttributes{
{
ctnName: "memory-manager-static",
cpus: "100m",
memory: "128Mi",
},
}
// override init container parameters
initCtnParams = []memoryManagerCtnAttributes{
{
ctnName: "init-memory-manager-static",
cpus: "100m",
memory: "128Mi",
},
}
})
ginkgo.It("should succeed to start the pod", func() {
ginkgo.By("Running the test pod")
testPod = f.PodClient().CreateSync(testPod)
// it no taste to verify NUMA pinning when the node has only one NUMA node
if !*isMultiNUMASupported {
return
}
verifyMemoryPinning(testPod, []int{0})
})
})
ginkgo.When("guaranteed pod has only app containers", func() {
ginkgo.BeforeEach(func() {
// override containers parameters
ctnParams = []memoryManagerCtnAttributes{
{
ctnName: "memory-manager-static",
cpus: "100m",
memory: "128Mi",
},
}
})
ginkgo.It("should succeed to start the pod", func() {
ginkgo.By("Running the test pod")
testPod = f.PodClient().CreateSync(testPod)
// it no taste to verify NUMA pinning when the node has only one NUMA node
if !*isMultiNUMASupported {
return
}
verifyMemoryPinning(testPod, []int{0})
})
})
ginkgo.When("multiple guaranteed pods started", func() {
var testPod2 *v1.Pod
ginkgo.BeforeEach(func() {
// override containers parameters
ctnParams = []memoryManagerCtnAttributes{
{
ctnName: "memory-manager-static",
cpus: "100m",
memory: "128Mi",
},
}
})
ginkgo.JustBeforeEach(func() {
testPod2 = makeMemoryManagerPod("memory-manager-static", initCtnParams, ctnParams)
})
ginkgo.It("should succeed to start all pods", func() {
ginkgo.By("Running the test pod and the test pod 2")
testPod = f.PodClient().CreateSync(testPod)
ginkgo.By("Running the test pod 2")
testPod2 = f.PodClient().CreateSync(testPod2)
// it no taste to verify NUMA pinning when the node has only one NUMA node
if !*isMultiNUMASupported {
return
}
verifyMemoryPinning(testPod, []int{0})
verifyMemoryPinning(testPod2, []int{0})
})
ginkgo.JustAfterEach(func() {
// delete the test pod 2
if testPod2.Name != "" {
f.PodClient().DeleteSync(testPod2.Name, metav1.DeleteOptions{}, 2*time.Minute)
}
})
})
// the test requires at least two NUMA nodes
// test on each NUMA node will start the pod that will consume almost all memory of the NUMA node except 256Mi
// after it will start an additional pod with the memory request that can not be satisfied by the single NUMA node
// free memory
ginkgo.When("guaranteed pod memory request is bigger than free memory on each NUMA node", func() {
var workloadPods []*v1.Pod
ginkgo.BeforeEach(func() {
if !*isMultiNUMASupported {
ginkgo.Skip("The machines has less than two NUMA nodes")
}
ctnParams = []memoryManagerCtnAttributes{
{
ctnName: "memory-manager-static",
cpus: "100m",
memory: "384Mi",
},
}
})
ginkgo.JustBeforeEach(func() {
stateData, err := getMemoryManagerState()
framework.ExpectNoError(err)
for _, memoryState := range stateData.MachineState {
// consume all memory except of 256Mi on each NUMA node via workload pods
workloadPodMemory := memoryState.MemoryMap[v1.ResourceMemory].Free - 256*1024*1024
memoryQuantity := resource.NewQuantity(int64(workloadPodMemory), resource.BinarySI)
workloadCtnAttrs := []memoryManagerCtnAttributes{
{
ctnName: "workload-pod",
cpus: "100m",
memory: memoryQuantity.String(),
},
}
workloadPod := makeMemoryManagerPod(workloadCtnAttrs[0].ctnName, initCtnParams, workloadCtnAttrs)
workloadPod = f.PodClient().CreateSync(workloadPod)
workloadPods = append(workloadPods, workloadPod)
}
})
ginkgo.It("should be rejected", func() {
ginkgo.By("Creating the pod")
testPod = f.PodClient().Create(testPod)
ginkgo.By("Checking that pod failed to start because of admission error")
gomega.Eventually(func() bool {
tmpPod, err := f.PodClient().Get(context.TODO(), testPod.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
if tmpPod.Status.Phase != v1.PodFailed {
return false
}
if tmpPod.Status.Reason != "UnexpectedAdmissionError" {
return false
}
if !strings.Contains(tmpPod.Status.Message, "Pod Allocate failed due to [memorymanager]") {
return false
}
return true
}, time.Minute, 5*time.Second).Should(
gomega.Equal(true),
"the pod succeeded to start, when it should fail with the admission error",
)
})
ginkgo.JustAfterEach(func() {
for _, workloadPod := range workloadPods {
if workloadPod.Name != "" {
f.PodClient().DeleteSync(workloadPod.Name, metav1.DeleteOptions{}, 2*time.Minute)
}
}
})
})
})
ginkgo.Context("with none policy", func() {
ginkgo.BeforeEach(func() {
tmpParams := *defaultKubeParams
tmpParams.memoryManagerPolicy = nonePolicy
kubeParams = &tmpParams
// override pod parameters
ctnParams = []memoryManagerCtnAttributes{
{
ctnName: "memory-manager-none",
cpus: "100m",
memory: "128Mi",
},
}
})
ginkgo.It("should succeed to start the pod", func() {
testPod = f.PodClient().CreateSync(testPod)
// it no taste to verify NUMA pinning when the node has only one NUMA node
if !*isMultiNUMASupported {
return
}
verifyMemoryPinning(testPod, allNUMANodes)
})
})
})