Merge pull request #104153 from cynepco3hahue/e2e_node_provide_static_kubelet_config

e2e node: provide static kubelet config
This commit is contained in:
Kubernetes Prow Robot 2021-11-04 17:11:53 -07:00 committed by GitHub
commit adcd2feb5e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 626 additions and 665 deletions

View File

@ -34,13 +34,12 @@ import (
cpumanagerstate "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" cpumanagerstate "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/test/e2e/framework"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
"github.com/onsi/ginkgo" "github.com/onsi/ginkgo"
"github.com/onsi/gomega" "github.com/onsi/gomega"
"k8s.io/kubernetes/test/e2e/framework"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
) )
// Helper for makeCPUManagerPod(). // Helper for makeCPUManagerPod().
@ -60,12 +59,12 @@ func makeCPUManagerPod(podName string, ctnAttributes []ctnAttribute) *v1.Pod {
Image: busyboxImage, Image: busyboxImage,
Resources: v1.ResourceRequirements{ Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{ Requests: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse(ctnAttr.cpuRequest), v1.ResourceCPU: resource.MustParse(ctnAttr.cpuRequest),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("100Mi"), v1.ResourceMemory: resource.MustParse("100Mi"),
}, },
Limits: v1.ResourceList{ Limits: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse(ctnAttr.cpuLimit), v1.ResourceCPU: resource.MustParse(ctnAttr.cpuLimit),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("100Mi"), v1.ResourceMemory: resource.MustParse("100Mi"),
}, },
}, },
Command: []string{"sh", "-c", cpusetCmd}, Command: []string{"sh", "-c", cpusetCmd},
@ -109,7 +108,7 @@ func getLocalNodeCPUDetails(f *framework.Framework) (cpuCapVal int64, cpuAllocVa
// RoundUp reserved CPUs to get only integer cores. // RoundUp reserved CPUs to get only integer cores.
cpuRes.RoundUp(0) cpuRes.RoundUp(0)
return cpuCap.Value(), (cpuCap.Value() - cpuRes.Value()), cpuRes.Value() return cpuCap.Value(), cpuCap.Value() - cpuRes.Value(), cpuRes.Value()
} }
func waitForContainerRemoval(containerName, podName, podNS string) { func waitForContainerRemoval(containerName, podName, podNS string) {
@ -185,76 +184,35 @@ func getCoreSiblingList(cpuRes int64) string {
return string(out) return string(out)
} }
func deleteStateFile() { type cpuManagerKubeletArguments struct {
err := exec.Command("/bin/sh", "-c", "rm -f /var/lib/kubelet/cpu_manager_state").Run() policyName string
framework.ExpectNoError(err, "error deleting state file") enableCPUManager bool
enableCPUManagerOptions bool
reservedSystemCPUs cpuset.CPUSet
options map[string]string
} }
func setOldKubeletConfig(f *framework.Framework, oldCfg *kubeletconfig.KubeletConfiguration) { func configureCPUManagerInKubelet(oldCfg *kubeletconfig.KubeletConfiguration, kubeletArguments *cpuManagerKubeletArguments) *kubeletconfig.KubeletConfiguration {
// Delete the CPU Manager state file so that the old Kubelet configuration
// can take effect.i
deleteStateFile()
if oldCfg != nil {
framework.ExpectNoError(setKubeletConfiguration(f, oldCfg))
}
}
func disableCPUManagerInKubelet(f *framework.Framework) (oldCfg *kubeletconfig.KubeletConfiguration) {
// Disable CPU Manager in Kubelet.
oldCfg, err := getCurrentKubeletConfig()
framework.ExpectNoError(err)
newCfg := oldCfg.DeepCopy() newCfg := oldCfg.DeepCopy()
if newCfg.FeatureGates == nil { if newCfg.FeatureGates == nil {
newCfg.FeatureGates = make(map[string]bool) newCfg.FeatureGates = make(map[string]bool)
} }
newCfg.FeatureGates["CPUManager"] = false
// Update the Kubelet configuration. newCfg.FeatureGates["CPUManager"] = kubeletArguments.enableCPUManager
framework.ExpectNoError(setKubeletConfiguration(f, newCfg))
// Wait for the Kubelet to be ready. newCfg.FeatureGates["CPUManagerPolicyOptions"] = kubeletArguments.enableCPUManagerOptions
gomega.Eventually(func() bool { newCfg.FeatureGates["CPUManagerPolicyBetaOptions"] = kubeletArguments.enableCPUManagerOptions
nodes, err := e2enode.TotalReady(f.ClientSet) newCfg.FeatureGates["CPUManagerPolicyAlphaOptions"] = kubeletArguments.enableCPUManagerOptions
framework.ExpectNoError(err)
return nodes == 1
}, time.Minute, time.Second).Should(gomega.BeTrue())
return oldCfg newCfg.CPUManagerPolicy = kubeletArguments.policyName
}
func configureCPUManagerInKubelet(f *framework.Framework, policyName string, cleanStateFile bool, reservedSystemCPUs cpuset.CPUSet, enableOptions bool, options map[string]string) (oldCfg *kubeletconfig.KubeletConfiguration) {
// Enable CPU Manager in Kubelet with static policy.
oldCfg, err := getCurrentKubeletConfig()
framework.ExpectNoError(err)
newCfg := oldCfg.DeepCopy()
if newCfg.FeatureGates == nil {
newCfg.FeatureGates = make(map[string]bool)
}
newCfg.FeatureGates["CPUManager"] = true
newCfg.FeatureGates["CPUManagerPolicyOptions"] = enableOptions
newCfg.FeatureGates["CPUManagerPolicyBetaOptions"] = enableOptions
newCfg.FeatureGates["CPUManagerPolicyAlphaOptions"] = enableOptions
// After graduation of the CPU Manager feature to Beta, the CPU Manager
// "none" policy is ON by default. But when we set the CPU Manager policy to
// "static" in this test and the Kubelet is restarted so that "static"
// policy can take effect, there will always be a conflict with the state
// checkpointed in the disk (i.e., the policy checkpointed in the disk will
// be "none" whereas we are trying to restart Kubelet with "static"
// policy). Therefore, we delete the state file so that we can proceed
// with the tests.
// Only delete the state file at the begin of the tests.
if cleanStateFile {
deleteStateFile()
}
newCfg.CPUManagerPolicy = policyName
newCfg.CPUManagerReconcilePeriod = metav1.Duration{Duration: 1 * time.Second} newCfg.CPUManagerReconcilePeriod = metav1.Duration{Duration: 1 * time.Second}
newCfg.CPUManagerPolicyOptions = options
if reservedSystemCPUs.Size() > 0 { if kubeletArguments.options != nil {
cpus := reservedSystemCPUs.String() newCfg.CPUManagerPolicyOptions = kubeletArguments.options
}
if kubeletArguments.reservedSystemCPUs.Size() > 0 {
cpus := kubeletArguments.reservedSystemCPUs.String()
framework.Logf("configureCPUManagerInKubelet: using reservedSystemCPUs=%q", cpus) framework.Logf("configureCPUManagerInKubelet: using reservedSystemCPUs=%q", cpus)
newCfg.ReservedSystemCPUs = cpus newCfg.ReservedSystemCPUs = cpus
} else { } else {
@ -269,15 +227,6 @@ func configureCPUManagerInKubelet(f *framework.Framework, policyName string, cle
newCfg.KubeReserved["cpu"] = "200m" newCfg.KubeReserved["cpu"] = "200m"
} }
} }
// Update the Kubelet configuration.
framework.ExpectNoError(setKubeletConfiguration(f, newCfg))
// Wait for the Kubelet to be ready.
gomega.Eventually(func() bool {
nodes, err := e2enode.TotalReady(f.ClientSet)
framework.ExpectNoError(err)
return nodes == 1
}, time.Minute, time.Second).Should(gomega.BeTrue())
return oldCfg return oldCfg
} }
@ -581,6 +530,12 @@ func runCPUManagerTests(f *framework.Framework) {
var ctnAttrs []ctnAttribute var ctnAttrs []ctnAttribute
var pod *v1.Pod var pod *v1.Pod
ginkgo.BeforeEach(func() {
var err error
oldCfg, err = getCurrentKubeletConfig()
framework.ExpectNoError(err)
})
ginkgo.It("should assign CPUs as expected based on the Pod spec", func() { ginkgo.It("should assign CPUs as expected based on the Pod spec", func() {
cpuCap, cpuAlloc, _ = getLocalNodeCPUDetails(f) cpuCap, cpuAlloc, _ = getLocalNodeCPUDetails(f)
@ -590,7 +545,12 @@ func runCPUManagerTests(f *framework.Framework) {
} }
// Enable CPU Manager in the kubelet. // Enable CPU Manager in the kubelet.
oldCfg = configureCPUManagerInKubelet(f, string(cpumanager.PolicyStatic), true, cpuset.CPUSet{}, false, nil) newCfg := configureCPUManagerInKubelet(oldCfg, &cpuManagerKubeletArguments{
policyName: string(cpumanager.PolicyStatic),
enableCPUManager: true,
reservedSystemCPUs: cpuset.CPUSet{},
})
updateKubeletConfig(f, newCfg, true)
ginkgo.By("running a non-Gu pod") ginkgo.By("running a non-Gu pod")
runNonGuPodTest(f, cpuCap) runNonGuPodTest(f, cpuCap)
@ -650,14 +610,24 @@ func runCPUManagerTests(f *framework.Framework) {
pod.Spec.Containers[0].Name, pod.Name) pod.Spec.Containers[0].Name, pod.Name)
ginkgo.By("disable cpu manager in kubelet") ginkgo.By("disable cpu manager in kubelet")
disableCPUManagerInKubelet(f) newCfg = configureCPUManagerInKubelet(oldCfg, &cpuManagerKubeletArguments{
policyName: string(cpumanager.PolicyStatic),
enableCPUManager: false,
reservedSystemCPUs: cpuset.CPUSet{},
})
updateKubeletConfig(f, newCfg, false)
ginkgo.By("by deleting the pod and waiting for container removal") ginkgo.By("by deleting the pod and waiting for container removal")
deletePods(f, []string{pod.Name}) deletePods(f, []string{pod.Name})
waitForContainerRemoval(pod.Spec.Containers[0].Name, pod.Name, pod.Namespace) waitForContainerRemoval(pod.Spec.Containers[0].Name, pod.Name, pod.Namespace)
ginkgo.By("enable cpu manager in kubelet without delete state file") ginkgo.By("enable cpu manager in kubelet without delete state file")
configureCPUManagerInKubelet(f, string(cpumanager.PolicyStatic), false, cpuset.CPUSet{}, false, nil) newCfg = configureCPUManagerInKubelet(oldCfg, &cpuManagerKubeletArguments{
policyName: string(cpumanager.PolicyStatic),
enableCPUManager: true,
reservedSystemCPUs: cpuset.CPUSet{},
})
updateKubeletConfig(f, newCfg, false)
ginkgo.By("wait for the deleted pod to be cleaned up from the state file") ginkgo.By("wait for the deleted pod to be cleaned up from the state file")
waitForStateFileCleanedUp() waitForStateFileCleanedUp()
@ -681,13 +651,21 @@ func runCPUManagerTests(f *framework.Framework) {
framework.Logf("SMT level %d", smtLevel) framework.Logf("SMT level %d", smtLevel)
cleanStateFile := true
// TODO: we assume the first available CPUID is 0, which is pretty fair, but we should probably // TODO: we assume the first available CPUID is 0, which is pretty fair, but we should probably
// check what we do have in the node. // check what we do have in the node.
cpuPolicyOptions := map[string]string{ cpuPolicyOptions := map[string]string{
cpumanager.FullPCPUsOnlyOption: "true", cpumanager.FullPCPUsOnlyOption: "true",
} }
oldCfg = configureCPUManagerInKubelet(f, string(cpumanager.PolicyStatic), cleanStateFile, cpuset.NewCPUSet(0), true, cpuPolicyOptions) newCfg := configureCPUManagerInKubelet(oldCfg,
&cpuManagerKubeletArguments{
policyName: string(cpumanager.PolicyStatic),
enableCPUManager: true,
reservedSystemCPUs: cpuset.NewCPUSet(0),
enableCPUManagerOptions: true,
options: cpuPolicyOptions,
},
)
updateKubeletConfig(f, newCfg, true)
// the order between negative and positive doesn't really matter // the order between negative and positive doesn't really matter
runSMTAlignmentNegativeTests(f) runSMTAlignmentNegativeTests(f)
@ -695,7 +673,7 @@ func runCPUManagerTests(f *framework.Framework) {
}) })
ginkgo.AfterEach(func() { ginkgo.AfterEach(func() {
setOldKubeletConfig(f, oldCfg) updateKubeletConfig(f, oldCfg, true)
}) })
} }

View File

@ -61,15 +61,6 @@ var _ = SIGDescribe("Device Manager [Serial] [Feature:DeviceManager][NodeFeatur
e2eskipper.Skipf("this test is meant to run on a system with at least one configured VF from SRIOV device") e2eskipper.Skipf("this test is meant to run on a system with at least one configured VF from SRIOV device")
} }
oldCfg := enablePodResourcesFeatureGateInKubelet(f)
defer func() {
// restore kubelet config
setOldKubeletConfig(f, oldCfg)
// Delete state file to allow repeated runs
deleteStateFile()
}()
configMap := getSRIOVDevicePluginConfigMap(framework.TestContext.SriovdpConfigMapFile) configMap := getSRIOVDevicePluginConfigMap(framework.TestContext.SriovdpConfigMapFile)
sd := setupSRIOVConfigOrFail(f, configMap) sd := setupSRIOVConfigOrFail(f, configMap)
@ -167,15 +158,6 @@ var _ = SIGDescribe("Device Manager [Serial] [Feature:DeviceManager][NodeFeatur
e2eskipper.Skipf("this test is meant to run on a system with at least one configured VF from SRIOV device") e2eskipper.Skipf("this test is meant to run on a system with at least one configured VF from SRIOV device")
} }
oldCfg := enablePodResourcesFeatureGateInKubelet(f)
defer func() {
// restore kubelet config
setOldKubeletConfig(f, oldCfg)
// Delete state file to allow repeated runs
deleteStateFile()
}()
endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket)
framework.ExpectNoError(err) framework.ExpectNoError(err)

View File

@ -110,7 +110,6 @@ func readDaemonSetV1OrDie(objBytes []byte) *appsv1.DaemonSet {
func testDevicePlugin(f *framework.Framework, pluginSockDir string) { func testDevicePlugin(f *framework.Framework, pluginSockDir string) {
pluginSockDir = filepath.Join(pluginSockDir) + "/" pluginSockDir = filepath.Join(pluginSockDir) + "/"
ginkgo.Context("DevicePlugin", func() { ginkgo.Context("DevicePlugin", func() {
ginkgo.By("Enabling support for Kubelet Plugins Watcher")
ginkgo.It("[Flaky] Verifies the Kubelet device plugin functionality.", func() { ginkgo.It("[Flaky] Verifies the Kubelet device plugin functionality.", func() {
ginkgo.By("Wait for node is ready to start with") ginkgo.By("Wait for node is ready to start with")
e2enode.WaitForNodeToBeReady(f.ClientSet, framework.TestContext.NodeName, 5*time.Minute) e2enode.WaitForNodeToBeReady(f.ClientSet, framework.TestContext.NodeName, 5*time.Minute)

View File

@ -70,7 +70,7 @@ type nodeConfigTestCase struct {
} }
// This test is marked [Disruptive] because the Kubelet restarts several times during this test. // This test is marked [Disruptive] because the Kubelet restarts several times during this test.
var _ = SIGDescribe("[Feature:DynamicKubeletConfig][NodeFeature:DynamicKubeletConfig][Serial][Disruptive]", func() { var _ = SIGDescribe("[Feature:DynamicKubeletConfig][NodeFeature:DynamicKubeletConfig][Deprecated][Disruptive]", func() {
f := framework.NewDefaultFramework("dynamic-kubelet-configuration-test") f := framework.NewDefaultFramework("dynamic-kubelet-configuration-test")
var beforeNode *v1.Node var beforeNode *v1.Node
var beforeConfigMap *v1.ConfigMap var beforeConfigMap *v1.ConfigMap

View File

@ -0,0 +1,95 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubeletconfig
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
kubeletconfigscheme "k8s.io/kubernetes/pkg/kubelet/apis/config/scheme"
kubeletconfigcodec "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/codec"
"sigs.k8s.io/yaml"
)
func getKubeletConfigFilePath() (string, error) {
cwd, err := os.Getwd()
if err != nil {
return "", fmt.Errorf("failed to get current working directory: %w", err)
}
// DO NOT name this file "kubelet" - you will overwrite the kubelet binary and be very confused :)
return filepath.Join(cwd, "kubelet-config"), nil
}
// GetCurrentKubeletConfigFromFile returns the current kubelet configuration under the filesystem.
// This method should only run together with e2e node tests, meaning the test executor and the cluster nodes is the
// same machine
func GetCurrentKubeletConfigFromFile() (*kubeletconfig.KubeletConfiguration, error) {
kubeletConfigFilePath, err := getKubeletConfigFilePath()
if err != nil {
return nil, err
}
data, err := ioutil.ReadFile(kubeletConfigFilePath)
if err != nil {
return nil, fmt.Errorf("failed to get the kubelet config from the file %q: %w", kubeletConfigFilePath, err)
}
var kubeletConfigV1Beta1 kubeletconfigv1beta1.KubeletConfiguration
if err := yaml.Unmarshal(data, &kubeletConfigV1Beta1); err != nil {
return nil, fmt.Errorf("failed to unmarshal the kubelet config: %w", err)
}
scheme, _, err := kubeletconfigscheme.NewSchemeAndCodecs()
if err != nil {
return nil, err
}
kubeletConfig := kubeletconfig.KubeletConfiguration{}
err = scheme.Convert(&kubeletConfigV1Beta1, &kubeletConfig, nil)
if err != nil {
return nil, err
}
return &kubeletConfig, nil
}
// WriteKubeletConfigFile updates the kubelet configuration under the filesystem
// This method should only run together with e2e node tests, meaning the test executor and the cluster nodes is the
// same machine
func WriteKubeletConfigFile(kubeletConfig *kubeletconfig.KubeletConfiguration) error {
data, err := kubeletconfigcodec.EncodeKubeletConfig(kubeletConfig, kubeletconfigv1beta1.SchemeGroupVersion)
if err != nil {
return err
}
kubeletConfigFilePath, err := getKubeletConfigFilePath()
if err != nil {
return err
}
if err := ioutil.WriteFile(kubeletConfigFilePath, data, 0644); err != nil {
return fmt.Errorf("failed to write the kubelet file to %q: %w", kubeletConfigFilePath, err)
}
return nil
}

View File

@ -1,3 +1,6 @@
//go:build linux
// +build linux
/* /*
Copyright 2017 The Kubernetes Authors. Copyright 2017 The Kubernetes Authors.
@ -38,7 +41,6 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state" "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state"
"k8s.io/kubernetes/pkg/kubelet/util" "k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod" e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
"k8s.io/utils/pointer" "k8s.io/utils/pointer"
@ -48,7 +50,6 @@ import (
const ( const (
evictionHardMemory = "memory.available" evictionHardMemory = "memory.available"
memoryManagerStateFile = "/var/lib/kubelet/memory_manager_state"
resourceMemory = "memory" resourceMemory = "memory"
staticPolicy = "Static" staticPolicy = "Static"
nonePolicy = "None" nonePolicy = "None"
@ -134,11 +135,6 @@ func makeMemoryManagerPod(podName string, initCtnAttributes, ctnAttributes []mem
return pod return pod
} }
func deleteMemoryManagerStateFile() {
err := exec.Command("/bin/sh", "-c", fmt.Sprintf("rm -f %s", memoryManagerStateFile)).Run()
framework.ExpectNoError(err, "failed to delete the state file")
}
func getMemoryManagerState() (*state.MemoryManagerCheckpoint, error) { func getMemoryManagerState() (*state.MemoryManagerCheckpoint, error) {
if _, err := os.Stat(memoryManagerStateFile); os.IsNotExist(err) { if _, err := os.Stat(memoryManagerStateFile); os.IsNotExist(err) {
return nil, fmt.Errorf("the memory manager state file %s does not exist", memoryManagerStateFile) return nil, fmt.Errorf("the memory manager state file %s does not exist", memoryManagerStateFile)
@ -183,63 +179,44 @@ type kubeletParams struct {
evictionHard map[string]string evictionHard map[string]string
} }
func getUpdatedKubeletConfig(oldCfg *kubeletconfig.KubeletConfiguration, params *kubeletParams) *kubeletconfig.KubeletConfiguration { func updateKubeletConfigWithMemoryManagerParams(initialCfg *kubeletconfig.KubeletConfiguration, params *kubeletParams) {
newCfg := oldCfg.DeepCopy() if initialCfg.FeatureGates == nil {
initialCfg.FeatureGates = map[string]bool{}
if newCfg.FeatureGates == nil {
newCfg.FeatureGates = map[string]bool{}
} }
newCfg.MemoryManagerPolicy = params.memoryManagerPolicy initialCfg.MemoryManagerPolicy = params.memoryManagerPolicy
// update system-reserved // update system-reserved
if newCfg.SystemReserved == nil { if initialCfg.SystemReserved == nil {
newCfg.SystemReserved = map[string]string{} initialCfg.SystemReserved = map[string]string{}
} }
for resourceName, value := range params.systemReserved { for resourceName, value := range params.systemReserved {
newCfg.SystemReserved[resourceName] = value initialCfg.SystemReserved[resourceName] = value
} }
// update kube-reserved // update kube-reserved
if newCfg.KubeReserved == nil { if initialCfg.KubeReserved == nil {
newCfg.KubeReserved = map[string]string{} initialCfg.KubeReserved = map[string]string{}
} }
for resourceName, value := range params.kubeReserved { for resourceName, value := range params.kubeReserved {
newCfg.KubeReserved[resourceName] = value initialCfg.KubeReserved[resourceName] = value
} }
// update hard eviction threshold // update hard eviction threshold
if newCfg.EvictionHard == nil { if initialCfg.EvictionHard == nil {
newCfg.EvictionHard = map[string]string{} initialCfg.EvictionHard = map[string]string{}
} }
for resourceName, value := range params.evictionHard { for resourceName, value := range params.evictionHard {
newCfg.EvictionHard[resourceName] = value initialCfg.EvictionHard[resourceName] = value
} }
// update reserved memory // update reserved memory
if newCfg.ReservedMemory == nil { if initialCfg.ReservedMemory == nil {
newCfg.ReservedMemory = []kubeletconfig.MemoryReservation{} initialCfg.ReservedMemory = []kubeletconfig.MemoryReservation{}
} }
for _, memoryReservation := range params.systemReservedMemory { for _, memoryReservation := range params.systemReservedMemory {
newCfg.ReservedMemory = append(newCfg.ReservedMemory, memoryReservation) initialCfg.ReservedMemory = append(initialCfg.ReservedMemory, memoryReservation)
} }
return newCfg
}
func updateKubeletConfig(f *framework.Framework, cfg *kubeletconfig.KubeletConfiguration) {
// remove the state file
deleteMemoryManagerStateFile()
// Update the Kubelet configuration
framework.ExpectNoError(setKubeletConfiguration(f, cfg))
// Wait for the Kubelet to be ready.
gomega.Eventually(func() bool {
nodes, err := e2enode.TotalReady(f.ClientSet)
framework.ExpectNoError(err)
return nodes == 1
}, time.Minute, time.Second).Should(gomega.BeTrue())
} }
func getAllNUMANodes() []int { func getAllNUMANodes() []int {
@ -264,15 +241,13 @@ func getAllNUMANodes() []int {
} }
// Serial because the test updates kubelet configuration. // Serial because the test updates kubelet configuration.
var _ = SIGDescribe("Memory Manager [Serial] [Feature:MemoryManager]", func() { var _ = SIGDescribe("Memory Manager [Disruptive] [Serial] [Feature:MemoryManager]", func() {
// TODO: add more complex tests that will include interaction between CPUManager, MemoryManager and TopologyManager // TODO: add more complex tests that will include interaction between CPUManager, MemoryManager and TopologyManager
var ( var (
allNUMANodes []int allNUMANodes []int
ctnParams, initCtnParams []memoryManagerCtnAttributes ctnParams, initCtnParams []memoryManagerCtnAttributes
is2MiHugepagesSupported *bool is2MiHugepagesSupported *bool
isMultiNUMASupported *bool isMultiNUMASupported *bool
kubeParams *kubeletParams
oldCfg *kubeletconfig.KubeletConfiguration
testPod *v1.Pod testPod *v1.Pod
) )
@ -305,53 +280,7 @@ var _ = SIGDescribe("Memory Manager [Serial] [Feature:MemoryManager]", func() {
framework.ExpectEqual(numaNodeIDs, currentNUMANodeIDs.ToSlice()) framework.ExpectEqual(numaNodeIDs, currentNUMANodeIDs.ToSlice())
} }
ginkgo.BeforeEach(func() { waitingForHugepages := func(hugepagesCount int) {
if isMultiNUMASupported == nil {
isMultiNUMASupported = pointer.BoolPtr(isMultiNUMA())
}
if is2MiHugepagesSupported == nil {
is2MiHugepagesSupported = pointer.BoolPtr(isHugePageAvailable(hugepagesSize2M))
}
if len(allNUMANodes) == 0 {
allNUMANodes = getAllNUMANodes()
}
})
// dynamically update the kubelet configuration
ginkgo.JustBeforeEach(func() {
var err error
// allocate hugepages
if *is2MiHugepagesSupported {
hugepagesCount := 256
ginkgo.By("Configuring hugepages")
gomega.Eventually(func() error {
if err := configureHugePages(hugepagesSize2M, hugepagesCount); err != nil {
return err
}
return nil
}, 30*time.Second, framework.Poll).Should(gomega.BeNil())
ginkgo.By("restarting kubelet to pick up pre-allocated hugepages")
// stop the kubelet and wait until the server will restart it automatically
stopKubelet()
// wait until the kubelet health check will fail
gomega.Eventually(func() bool {
return kubeletHealthCheck(kubeletHealthCheckURL)
}, time.Minute, time.Second).Should(gomega.BeFalse())
restartKubelet(false)
// wait until the kubelet health check will pass
gomega.Eventually(func() bool {
return kubeletHealthCheck(kubeletHealthCheckURL)
}, 2*time.Minute, 10*time.Second).Should(gomega.BeTrue())
ginkgo.By("Waiting for hugepages resource to become available on the local node")
gomega.Eventually(func() error { gomega.Eventually(func() error {
node, err := f.ClientSet.CoreV1().Nodes().Get(context.TODO(), framework.TestContext.NodeName, metav1.GetOptions{}) node, err := f.ClientSet.CoreV1().Nodes().Get(context.TODO(), framework.TestContext.NodeName, metav1.GetOptions{})
if err != nil { if err != nil {
@ -377,18 +306,38 @@ var _ = SIGDescribe("Memory Manager [Serial] [Feature:MemoryManager]", func() {
}, time.Minute, framework.Poll).Should(gomega.BeNil()) }, time.Minute, framework.Poll).Should(gomega.BeNil())
} }
// get the old kubelet config ginkgo.BeforeEach(func() {
oldCfg, err = getCurrentKubeletConfig() if isMultiNUMASupported == nil {
framework.ExpectNoError(err) isMultiNUMASupported = pointer.BoolPtr(isMultiNUMA())
}
// update the kubelet config with new parameters if is2MiHugepagesSupported == nil {
newCfg := getUpdatedKubeletConfig(oldCfg, kubeParams) is2MiHugepagesSupported = pointer.BoolPtr(isHugePageAvailable(hugepagesSize2M))
updateKubeletConfig(f, newCfg) }
// request hugepages resources under the container if len(allNUMANodes) == 0 {
allNUMANodes = getAllNUMANodes()
}
})
// dynamically update the kubelet configuration
ginkgo.JustBeforeEach(func() {
hugepagesCount := 8
// allocate hugepages
if *is2MiHugepagesSupported { if *is2MiHugepagesSupported {
ginkgo.By("Configuring hugepages")
gomega.Eventually(func() error {
return configureHugePages(hugepagesSize2M, hugepagesCount)
}, 30*time.Second, framework.Poll).Should(gomega.BeNil())
restartKubelet(true)
ginkgo.By("Waiting for hugepages resource to become available on the local node")
waitingForHugepages(hugepagesCount)
for i := 0; i < len(ctnParams); i++ { for i := 0; i < len(ctnParams); i++ {
ctnParams[i].hugepages2Mi = "128Mi" ctnParams[i].hugepages2Mi = "8Mi"
} }
} }
@ -404,28 +353,23 @@ var _ = SIGDescribe("Memory Manager [Serial] [Feature:MemoryManager]", func() {
} }
// release hugepages // release hugepages
if *is2MiHugepagesSupported {
ginkgo.By("Releasing allocated hugepages")
gomega.Eventually(func() error { gomega.Eventually(func() error {
return configureHugePages(hugepagesSize2M, 0) return configureHugePages(hugepagesSize2M, 0)
}, 90*time.Second, 15*time.Second).ShouldNot(gomega.HaveOccurred(), "failed to release hugepages") }, 90*time.Second, 15*time.Second).ShouldNot(gomega.HaveOccurred(), "failed to release hugepages")
// update the kubelet config with old values restartKubelet(true)
updateKubeletConfig(f, oldCfg)
// wait until the kubelet health check will pass and will continue to pass for specified period of time waitingForHugepages(0)
gomega.Eventually(func() bool { }
return kubeletHealthCheck(kubeletHealthCheckURL)
}, time.Minute, 10*time.Second).Should(gomega.BeTrue())
gomega.Consistently(func() bool {
return kubeletHealthCheck(kubeletHealthCheckURL)
}, time.Minute, 10*time.Second).Should(gomega.BeTrue())
}) })
ginkgo.Context("with static policy", func() { ginkgo.Context("with static policy", func() {
ginkgo.BeforeEach(func() { tempSetCurrentKubeletConfig(f, func(initialConfig *kubeletconfig.KubeletConfiguration) {
// override kubelet configuration parameters kubeParams := *defaultKubeParams
tmpParams := *defaultKubeParams kubeParams.memoryManagerPolicy = staticPolicy
tmpParams.memoryManagerPolicy = staticPolicy updateKubeletConfigWithMemoryManagerParams(initialConfig, &kubeParams)
kubeParams = &tmpParams
}) })
ginkgo.JustAfterEach(func() { ginkgo.JustAfterEach(func() {
@ -700,11 +644,15 @@ var _ = SIGDescribe("Memory Manager [Serial] [Feature:MemoryManager]", func() {
}) })
ginkgo.Context("with none policy", func() { ginkgo.Context("with none policy", func() {
ginkgo.BeforeEach(func() { tempSetCurrentKubeletConfig(f, func(initialConfig *kubeletconfig.KubeletConfiguration) {
tmpParams := *defaultKubeParams kubeParams := *defaultKubeParams
tmpParams.memoryManagerPolicy = nonePolicy kubeParams.memoryManagerPolicy = nonePolicy
kubeParams = &tmpParams updateKubeletConfigWithMemoryManagerParams(initialConfig, &kubeParams)
})
// empty context to configure same container parameters for all tests
ginkgo.Context("", func() {
ginkgo.BeforeEach(func() {
// override pod parameters // override pod parameters
ctnParams = []memoryManagerCtnAttributes{ ctnParams = []memoryManagerCtnAttributes{
{ {
@ -766,4 +714,5 @@ var _ = SIGDescribe("Memory Manager [Serial] [Feature:MemoryManager]", func() {
verifyMemoryPinning(testPod, allNUMANodes) verifyMemoryPinning(testPod, allNUMANodes)
}) })
}) })
})
}) })

View File

@ -34,8 +34,10 @@ import (
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/cm" "k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/stats/pidlimit" "k8s.io/kubernetes/pkg/kubelet/stats/pidlimit"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
e2enodekubelet "k8s.io/kubernetes/test/e2e_node/kubeletconfig"
"github.com/onsi/ginkgo" "github.com/onsi/ginkgo"
"github.com/onsi/gomega" "github.com/onsi/gomega"
@ -183,7 +185,24 @@ func runTest(f *framework.Framework) error {
defer destroyTemporaryCgroupsForReservation(cgroupManager) defer destroyTemporaryCgroupsForReservation(cgroupManager)
defer func() { defer func() {
if oldCfg != nil { if oldCfg != nil {
framework.ExpectNoError(setKubeletConfiguration(f, oldCfg)) // Update the Kubelet configuration.
ginkgo.By("Stopping the kubelet")
startKubelet := stopKubelet()
// wait until the kubelet health check will fail
gomega.Eventually(func() bool {
return kubeletHealthCheck(kubeletHealthCheckURL)
}, time.Minute, time.Second).Should(gomega.BeFalse())
framework.ExpectNoError(e2enodekubelet.WriteKubeletConfigFile(oldCfg))
ginkgo.By("Starting the kubelet")
startKubelet()
// wait until the kubelet health check will succeed
gomega.Eventually(func() bool {
return kubeletHealthCheck(kubeletHealthCheckURL)
}, 2*time.Minute, 5*time.Second).Should(gomega.BeTrue())
} }
}() }()
if err := createTemporaryCgroupsForReservation(cgroupManager); err != nil { if err := createTemporaryCgroupsForReservation(cgroupManager); err != nil {
@ -193,7 +212,25 @@ func runTest(f *framework.Framework) error {
// Change existing kubelet configuration // Change existing kubelet configuration
setDesiredConfiguration(newCfg) setDesiredConfiguration(newCfg)
// Set the new kubelet configuration. // Set the new kubelet configuration.
err = setKubeletConfiguration(f, newCfg) // Update the Kubelet configuration.
ginkgo.By("Stopping the kubelet")
startKubelet := stopKubelet()
// wait until the kubelet health check will fail
gomega.Eventually(func() bool {
return kubeletHealthCheck(kubeletHealthCheckURL)
}, time.Minute, time.Second).Should(gomega.BeFalse())
framework.ExpectNoError(e2enodekubelet.WriteKubeletConfigFile(newCfg))
ginkgo.By("Starting the kubelet")
startKubelet()
// wait until the kubelet health check will succeed
gomega.Eventually(func() bool {
return kubeletHealthCheck(kubeletHealthCheckURL)
}, 2*time.Minute, 5*time.Second).Should(gomega.BeTrue())
if err != nil { if err != nil {
return err return err
} }

View File

@ -24,10 +24,12 @@ import (
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod" e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
e2enodekubelet "k8s.io/kubernetes/test/e2e_node/kubeletconfig"
"k8s.io/kubernetes/test/e2e_node/perf/workloads" "k8s.io/kubernetes/test/e2e_node/perf/workloads"
"github.com/onsi/ginkgo" "github.com/onsi/ginkgo"
@ -46,7 +48,24 @@ func makeNodePerfPod(w workloads.NodePerfWorkload) *v1.Pod {
func setKubeletConfig(f *framework.Framework, cfg *kubeletconfig.KubeletConfiguration) { func setKubeletConfig(f *framework.Framework, cfg *kubeletconfig.KubeletConfiguration) {
if cfg != nil { if cfg != nil {
framework.ExpectNoError(setKubeletConfiguration(f, cfg)) // Update the Kubelet configuration.
ginkgo.By("Stopping the kubelet")
startKubelet := stopKubelet()
// wait until the kubelet health check will fail
gomega.Eventually(func() bool {
return kubeletHealthCheck(kubeletHealthCheckURL)
}, time.Minute, time.Second).Should(gomega.BeFalse())
framework.ExpectNoError(e2enodekubelet.WriteKubeletConfigFile(cfg))
ginkgo.By("Starting the kubelet")
startKubelet()
// wait until the kubelet health check will succeed
gomega.Eventually(func() bool {
return kubeletHealthCheck(kubeletHealthCheckURL)
}, 2*time.Minute, 5*time.Second).Should(gomega.BeTrue())
} }
// Wait for the Kubelet to be ready. // Wait for the Kubelet to be ready.

View File

@ -28,7 +28,6 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
kubeletpodresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1" kubeletpodresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1"
kubefeatures "k8s.io/kubernetes/pkg/features" kubefeatures "k8s.io/kubernetes/pkg/features"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
@ -38,14 +37,13 @@ import (
"k8s.io/kubernetes/pkg/kubelet/util" "k8s.io/kubernetes/pkg/kubelet/util"
testutils "k8s.io/kubernetes/test/utils" testutils "k8s.io/kubernetes/test/utils"
"github.com/onsi/ginkgo"
"github.com/onsi/gomega"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod" e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
e2etestfiles "k8s.io/kubernetes/test/e2e/framework/testfiles" e2etestfiles "k8s.io/kubernetes/test/e2e/framework/testfiles"
"github.com/onsi/ginkgo"
"github.com/onsi/gomega"
) )
type podDesc struct { type podDesc struct {
@ -494,7 +492,7 @@ func podresourcesListTests(f *framework.Framework, cli kubeletpodresourcesv1.Pod
} }
func podresourcesGetAllocatableResourcesTests(f *framework.Framework, cli kubeletpodresourcesv1.PodResourcesListerClient, sd *sriovData, onlineCPUs, reservedSystemCPUs cpuset.CPUSet) { func podresourcesGetAllocatableResourcesTests(cli kubeletpodresourcesv1.PodResourcesListerClient, sd *sriovData, onlineCPUs, reservedSystemCPUs cpuset.CPUSet) {
ginkgo.By("checking the devices known to the kubelet") ginkgo.By("checking the devices known to the kubelet")
resp, err := cli.GetAllocatableResources(context.TODO(), &kubeletpodresourcesv1.AllocatableResourcesRequest{}) resp, err := cli.GetAllocatableResources(context.TODO(), &kubeletpodresourcesv1.AllocatableResourcesRequest{})
framework.ExpectNoErrorWithOffset(1, err) framework.ExpectNoErrorWithOffset(1, err)
@ -535,30 +533,39 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P
reservedSystemCPUs := cpuset.MustParse("1") reservedSystemCPUs := cpuset.MustParse("1")
ginkgo.Context("With SRIOV devices in the system", func() { ginkgo.Context("with SRIOV devices in the system", func() {
ginkgo.It("should return the expected responses with cpumanager static policy enabled", func() { ginkgo.BeforeEach(func() {
requireSRIOVDevices()
})
ginkgo.Context("with CPU manager Static policy", func() {
ginkgo.BeforeEach(func() {
// this is a very rough check. We just want to rule out system that does NOT have enough resources // this is a very rough check. We just want to rule out system that does NOT have enough resources
_, cpuAlloc, _ := getLocalNodeCPUDetails(f) _, cpuAlloc, _ := getLocalNodeCPUDetails(f)
if cpuAlloc < minCoreCount { if cpuAlloc < minCoreCount {
e2eskipper.Skipf("Skipping CPU Manager tests since the CPU allocatable < %d", minCoreCount) e2eskipper.Skipf("Skipping CPU Manager tests since the CPU allocatable < %d", minCoreCount)
} }
})
requireSRIOVDevices() // empty context to apply kubelet config changes
ginkgo.Context("", func() {
tempSetCurrentKubeletConfig(f, func(initialConfig *kubeletconfig.KubeletConfiguration) {
// Set the CPU Manager policy to static.
initialConfig.CPUManagerPolicy = string(cpumanager.PolicyStatic)
// Set the CPU Manager reconcile period to 1 second.
initialConfig.CPUManagerReconcilePeriod = metav1.Duration{Duration: 1 * time.Second}
cpus := reservedSystemCPUs.String()
framework.Logf("configurePodResourcesInKubelet: using reservedSystemCPUs=%q", cpus)
initialConfig.ReservedSystemCPUs = cpus
})
ginkgo.It("should return the expected responses", func() {
onlineCPUs, err := getOnlineCPUs() onlineCPUs, err := getOnlineCPUs()
framework.ExpectNoError(err) framework.ExpectNoError(err)
// Make sure all the feature gates and the right settings are in place.
oldCfg := configurePodResourcesInKubelet(f, true, reservedSystemCPUs)
defer func() {
// restore kubelet config
setOldKubeletConfig(f, oldCfg)
// Delete state file to allow repeated runs
deleteStateFile()
}()
configMap := getSRIOVDevicePluginConfigMap(framework.TestContext.SriovdpConfigMapFile) configMap := getSRIOVDevicePluginConfigMap(framework.TestContext.SriovdpConfigMapFile)
sd := setupSRIOVConfigOrFail(f, configMap) sd := setupSRIOVConfigOrFail(f, configMap)
defer teardownSRIOVConfigOrFail(f, sd) defer teardownSRIOVConfigOrFail(f, sd)
@ -577,24 +584,17 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P
ginkgo.By("checking List()") ginkgo.By("checking List()")
podresourcesListTests(f, cli, sd) podresourcesListTests(f, cli, sd)
ginkgo.By("checking GetAllocatableResources()") ginkgo.By("checking GetAllocatableResources()")
podresourcesGetAllocatableResourcesTests(f, cli, sd, onlineCPUs, reservedSystemCPUs) podresourcesGetAllocatableResourcesTests(cli, sd, onlineCPUs, reservedSystemCPUs)
})
})
}) })
ginkgo.It("should return the expected responses with cpumanager none policy", func() { ginkgo.Context("with CPU manager None policy", func() {
ginkgo.It("should return the expected responses", func() {
// current default is "none" policy - no need to restart the kubelet // current default is "none" policy - no need to restart the kubelet
requireSRIOVDevices() requireSRIOVDevices()
oldCfg := enablePodResourcesFeatureGateInKubelet(f)
defer func() {
// restore kubelet config
setOldKubeletConfig(f, oldCfg)
// Delete state file to allow repeated runs
deleteStateFile()
}()
configMap := getSRIOVDevicePluginConfigMap(framework.TestContext.SriovdpConfigMapFile) configMap := getSRIOVDevicePluginConfigMap(framework.TestContext.SriovdpConfigMapFile)
sd := setupSRIOVConfigOrFail(f, configMap) sd := setupSRIOVConfigOrFail(f, configMap)
defer teardownSRIOVConfigOrFail(f, sd) defer teardownSRIOVConfigOrFail(f, sd)
@ -612,35 +612,44 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P
// intentionally passing empty cpuset instead of onlineCPUs because with none policy // intentionally passing empty cpuset instead of onlineCPUs because with none policy
// we should get no allocatable cpus - no exclusively allocatable CPUs, depends on policy static // we should get no allocatable cpus - no exclusively allocatable CPUs, depends on policy static
podresourcesGetAllocatableResourcesTests(f, cli, sd, cpuset.CPUSet{}, cpuset.CPUSet{}) podresourcesGetAllocatableResourcesTests(cli, sd, cpuset.CPUSet{}, cpuset.CPUSet{})
})
})
}) })
ginkgo.Context("without SRIOV devices in the system", func() {
ginkgo.BeforeEach(func() {
requireLackOfSRIOVDevices()
}) })
ginkgo.Context("Without SRIOV devices in the system", func() { ginkgo.Context("with CPU manager Static policy", func() {
ginkgo.It("should return the expected responses with cpumanager static policy enabled", func() { ginkgo.BeforeEach(func() {
// this is a very rough check. We just want to rule out system that does NOT have enough resources // this is a very rough check. We just want to rule out system that does NOT have enough resources
_, cpuAlloc, _ := getLocalNodeCPUDetails(f) _, cpuAlloc, _ := getLocalNodeCPUDetails(f)
if cpuAlloc < minCoreCount { if cpuAlloc < minCoreCount {
e2eskipper.Skipf("Skipping CPU Manager tests since the CPU allocatable < %d", minCoreCount) e2eskipper.Skipf("Skipping CPU Manager tests since the CPU allocatable < %d", minCoreCount)
} }
})
requireLackOfSRIOVDevices() // empty context to apply kubelet config changes
ginkgo.Context("", func() {
tempSetCurrentKubeletConfig(f, func(initialConfig *kubeletconfig.KubeletConfiguration) {
// Set the CPU Manager policy to static.
initialConfig.CPUManagerPolicy = string(cpumanager.PolicyStatic)
// Set the CPU Manager reconcile period to 1 second.
initialConfig.CPUManagerReconcilePeriod = metav1.Duration{Duration: 1 * time.Second}
cpus := reservedSystemCPUs.String()
framework.Logf("configurePodResourcesInKubelet: using reservedSystemCPUs=%q", cpus)
initialConfig.ReservedSystemCPUs = cpus
})
ginkgo.It("should return the expected responses", func() {
onlineCPUs, err := getOnlineCPUs() onlineCPUs, err := getOnlineCPUs()
framework.ExpectNoError(err) framework.ExpectNoError(err)
// Make sure all the feature gates and the right settings are in place.
oldCfg := configurePodResourcesInKubelet(f, true, reservedSystemCPUs)
defer func() {
// restore kubelet config
setOldKubeletConfig(f, oldCfg)
// Delete state file to allow repeated runs
deleteStateFile()
}()
endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket)
framework.ExpectNoError(err) framework.ExpectNoError(err)
@ -649,23 +658,13 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P
defer conn.Close() defer conn.Close()
podresourcesListTests(f, cli, nil) podresourcesListTests(f, cli, nil)
podresourcesGetAllocatableResourcesTests(f, cli, nil, onlineCPUs, reservedSystemCPUs) podresourcesGetAllocatableResourcesTests(cli, nil, onlineCPUs, reservedSystemCPUs)
})
})
}) })
ginkgo.It("should return the expected responses with cpumanager none policy", func() { ginkgo.Context("with CPU manager None policy", func() {
// current default is "none" policy - no need to restart the kubelet ginkgo.It("should return the expected responses", func() {
requireLackOfSRIOVDevices()
oldCfg := enablePodResourcesFeatureGateInKubelet(f)
defer func() {
// restore kubelet config
setOldKubeletConfig(f, oldCfg)
// Delete state file to allow repeated runs
deleteStateFile()
}()
endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket)
framework.ExpectNoError(err) framework.ExpectNoError(err)
@ -675,12 +674,19 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P
// intentionally passing empty cpuset instead of onlineCPUs because with none policy // intentionally passing empty cpuset instead of onlineCPUs because with none policy
// we should get no allocatable cpus - no exclusively allocatable CPUs, depends on policy static // we should get no allocatable cpus - no exclusively allocatable CPUs, depends on policy static
podresourcesGetAllocatableResourcesTests(f, cli, nil, cpuset.CPUSet{}, cpuset.CPUSet{}) podresourcesGetAllocatableResourcesTests(cli, nil, cpuset.CPUSet{}, cpuset.CPUSet{})
})
})
ginkgo.Context("with disabled KubeletPodResourcesGetAllocatable feature gate", func() {
tempSetCurrentKubeletConfig(f, func(initialConfig *kubeletconfig.KubeletConfiguration) {
if initialConfig.FeatureGates == nil {
initialConfig.FeatureGates = make(map[string]bool)
}
initialConfig.FeatureGates[string(kubefeatures.KubeletPodResourcesGetAllocatable)] = false
}) })
ginkgo.It("should return the expected error with the feature gate disabled", func() { ginkgo.It("should return the expected error with the feature gate disabled", func() {
e2eskipper.SkipIfFeatureGateEnabled(kubefeatures.KubeletPodResourcesGetAllocatable)
endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket)
framework.ExpectNoError(err) framework.ExpectNoError(err)
@ -694,35 +700,41 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P
framework.ExpectError(err, "With feature gate disabled, the call must fail") framework.ExpectError(err, "With feature gate disabled, the call must fail")
}) })
}) })
})
ginkgo.Context("Use KubeVirt device plugin, which reports resources w/o hardware topology", func() { ginkgo.Context("with KubeVirt device plugin, which reports resources w/o hardware topology", func() {
ginkgo.It("should return proper podresources the same as before the restart of kubelet", func() { ginkgo.BeforeEach(func() {
if !utilfeature.DefaultFeatureGate.Enabled(kubefeatures.KubeletPodResources) {
e2eskipper.Skipf("this test is meant to run with the POD Resources Extensions feature gate enabled")
}
_, err := os.Stat("/dev/kvm") _, err := os.Stat("/dev/kvm")
if errors.Is(err, os.ErrNotExist) { if errors.Is(err, os.ErrNotExist) {
e2eskipper.Skipf("KubeVirt device plugin could work only in kvm based environment") e2eskipper.Skipf("KubeVirt device plugin could work only in kvm based environment")
} }
})
ginkgo.Context("with CPU manager Static policy", func() {
ginkgo.BeforeEach(func() {
// this is a very rough check. We just want to rule out system that does NOT have enough resources
_, cpuAlloc, _ := getLocalNodeCPUDetails(f) _, cpuAlloc, _ := getLocalNodeCPUDetails(f)
if cpuAlloc < minCoreCount { if cpuAlloc < minCoreCount {
e2eskipper.Skipf("Skipping CPU Manager tests since the CPU allocatable < %d", minCoreCount) e2eskipper.Skipf("Skipping CPU Manager tests since the CPU allocatable < %d", minCoreCount)
} }
})
// Make sure all the feature gates and the right settings are in place. // empty context to apply kubelet config changes
oldCfg := configurePodResourcesInKubelet(f, true, reservedSystemCPUs) ginkgo.Context("", func() {
defer func() { tempSetCurrentKubeletConfig(f, func(initialConfig *kubeletconfig.KubeletConfiguration) {
// restore kubelet config // Set the CPU Manager policy to static.
setOldKubeletConfig(f, oldCfg) initialConfig.CPUManagerPolicy = string(cpumanager.PolicyStatic)
// Delete state file to allow repeated runs // Set the CPU Manager reconcile period to 1 second.
deleteStateFile() initialConfig.CPUManagerReconcilePeriod = metav1.Duration{Duration: 1 * time.Second}
}()
cpus := reservedSystemCPUs.String()
framework.Logf("configurePodResourcesInKubelet: using reservedSystemCPUs=%q", cpus)
initialConfig.ReservedSystemCPUs = cpus
})
ginkgo.It("should return proper podresources the same as before the restart of kubelet", func() {
dpPod := setupKubeVirtDevicePluginOrFail(f) dpPod := setupKubeVirtDevicePluginOrFail(f)
defer teardownKubeVirtDevicePluginOrFail(f, dpPod) defer teardownKubeVirtDevicePluginOrFail(f, dpPod)
@ -768,7 +780,8 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P
expectPodResources(1, cli, []podDesc{desc}) expectPodResources(1, cli, []podDesc{desc})
tpd.deletePodsForTest(f) tpd.deletePodsForTest(f)
}) })
})
})
}) })
}) })
@ -786,82 +799,6 @@ func getOnlineCPUs() (cpuset.CPUSet, error) {
return cpuset.Parse(strings.TrimSpace(string(onlineCPUList))) return cpuset.Parse(strings.TrimSpace(string(onlineCPUList)))
} }
func configurePodResourcesInKubelet(f *framework.Framework, cleanStateFile bool, reservedSystemCPUs cpuset.CPUSet) (oldCfg *kubeletconfig.KubeletConfiguration) {
// we also need CPUManager with static policy to be able to do meaningful testing
oldCfg, err := getCurrentKubeletConfig()
framework.ExpectNoError(err)
newCfg := oldCfg.DeepCopy()
newCfg.FeatureGates[string(kubefeatures.KubeletPodResourcesGetAllocatable)] = true
// After graduation of the CPU Manager feature to Beta, the CPU Manager
// "none" policy is ON by default. But when we set the CPU Manager policy to
// "static" in this test and the Kubelet is restarted so that "static"
// policy can take effect, there will always be a conflict with the state
// checkpointed in the disk (i.e., the policy checkpointed in the disk will
// be "none" whereas we are trying to restart Kubelet with "static"
// policy). Therefore, we delete the state file so that we can proceed
// with the tests.
// Only delete the state file at the begin of the tests.
if cleanStateFile {
deleteStateFile()
}
// Set the CPU Manager policy to static.
newCfg.CPUManagerPolicy = string(cpumanager.PolicyStatic)
// Set the CPU Manager reconcile period to 1 second.
newCfg.CPUManagerReconcilePeriod = metav1.Duration{Duration: 1 * time.Second}
if reservedSystemCPUs.Size() > 0 {
cpus := reservedSystemCPUs.String()
framework.Logf("configurePodResourcesInKubelet: using reservedSystemCPUs=%q", cpus)
newCfg.ReservedSystemCPUs = cpus
} else {
// The Kubelet panics if either kube-reserved or system-reserved is not set
// when CPU Manager is enabled. Set cpu in kube-reserved > 0 so that
// kubelet doesn't panic.
if newCfg.KubeReserved == nil {
newCfg.KubeReserved = map[string]string{}
}
if _, ok := newCfg.KubeReserved["cpu"]; !ok {
newCfg.KubeReserved["cpu"] = "200m"
}
}
// Update the Kubelet configuration.
framework.ExpectNoError(setKubeletConfiguration(f, newCfg))
// Wait for the Kubelet to be ready.
gomega.Eventually(func() bool {
nodes, err := e2enode.TotalReady(f.ClientSet)
framework.ExpectNoError(err)
return nodes == 1
}, time.Minute, time.Second).Should(gomega.BeTrue())
return oldCfg
}
func enablePodResourcesFeatureGateInKubelet(f *framework.Framework) (oldCfg *kubeletconfig.KubeletConfiguration) {
oldCfg, err := getCurrentKubeletConfig()
framework.ExpectNoError(err)
newCfg := oldCfg.DeepCopy()
if newCfg.FeatureGates == nil {
newCfg.FeatureGates = make(map[string]bool)
}
// Update the Kubelet configuration.
framework.ExpectNoError(setKubeletConfiguration(f, newCfg))
// Wait for the Kubelet to be ready.
gomega.Eventually(func() bool {
nodes, err := e2enode.TotalReady(f.ClientSet)
framework.ExpectNoError(err)
return nodes == 1
}, time.Minute, time.Second).Should(gomega.BeTrue())
return oldCfg
}
func setupKubeVirtDevicePluginOrFail(f *framework.Framework) *v1.Pod { func setupKubeVirtDevicePluginOrFail(f *framework.Framework) *v1.Pod {
e2enode.WaitForNodeToBeReady(f.ClientSet, framework.TestContext.NodeName, 5*time.Minute) e2enode.WaitForNodeToBeReady(f.ClientSet, framework.TestContext.NodeName, 5*time.Minute)

View File

@ -27,23 +27,22 @@ import (
"sync" "sync"
"time" "time"
testutils "k8s.io/kubernetes/test/utils"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod" e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
e2etestfiles "k8s.io/kubernetes/test/e2e/framework/testfiles" e2etestfiles "k8s.io/kubernetes/test/e2e/framework/testfiles"
testutils "k8s.io/kubernetes/test/utils"
"github.com/onsi/ginkgo" "github.com/onsi/ginkgo"
"github.com/onsi/gomega" "github.com/onsi/gomega"
@ -194,7 +193,7 @@ func findNUMANodeWithoutSRIOVDevices(configMap *v1.ConfigMap, numaNodes int) (in
return findNUMANodeWithoutSRIOVDevicesFromSysfs(numaNodes) return findNUMANodeWithoutSRIOVDevicesFromSysfs(numaNodes)
} }
func configureTopologyManagerInKubelet(f *framework.Framework, oldCfg *kubeletconfig.KubeletConfiguration, policy, scope string, configMap *v1.ConfigMap, numaNodes int) string { func configureTopologyManagerInKubelet(oldCfg *kubeletconfig.KubeletConfiguration, policy, scope string, configMap *v1.ConfigMap, numaNodes int) (*kubeletconfig.KubeletConfiguration, string) {
// Configure Topology Manager in Kubelet with policy. // Configure Topology Manager in Kubelet with policy.
newCfg := oldCfg.DeepCopy() newCfg := oldCfg.DeepCopy()
if newCfg.FeatureGates == nil { if newCfg.FeatureGates == nil {
@ -204,8 +203,6 @@ func configureTopologyManagerInKubelet(f *framework.Framework, oldCfg *kubeletco
newCfg.FeatureGates["CPUManager"] = true newCfg.FeatureGates["CPUManager"] = true
newCfg.FeatureGates["TopologyManager"] = true newCfg.FeatureGates["TopologyManager"] = true
deleteStateFile()
// Set the Topology Manager policy // Set the Topology Manager policy
newCfg.TopologyManagerPolicy = policy newCfg.TopologyManagerPolicy = policy
@ -237,17 +234,7 @@ func configureTopologyManagerInKubelet(f *framework.Framework, oldCfg *kubeletco
// Dump the config -- debug // Dump the config -- debug
framework.Logf("New kubelet config is %s", *newCfg) framework.Logf("New kubelet config is %s", *newCfg)
// Update the Kubelet configuration. return newCfg, newCfg.ReservedSystemCPUs
framework.ExpectNoError(setKubeletConfiguration(f, newCfg))
// Wait for the Kubelet to be ready.
gomega.Eventually(func() bool {
nodes, err := e2enode.TotalReady(f.ClientSet)
framework.ExpectNoError(err)
return nodes == 1
}, time.Minute, time.Second).Should(gomega.BeTrue())
return newCfg.ReservedSystemCPUs
} }
// getSRIOVDevicePluginPod returns the Device Plugin pod for sriov resources in e2e tests. // getSRIOVDevicePluginPod returns the Device Plugin pod for sriov resources in e2e tests.
@ -900,7 +887,8 @@ func runTopologyManagerTests(f *framework.Framework) {
ginkgo.By(fmt.Sprintf("by configuring Topology Manager policy to %s", policy)) ginkgo.By(fmt.Sprintf("by configuring Topology Manager policy to %s", policy))
framework.Logf("Configuring topology Manager policy to %s", policy) framework.Logf("Configuring topology Manager policy to %s", policy)
configureTopologyManagerInKubelet(f, oldCfg, policy, scope, nil, 0) newCfg, _ := configureTopologyManagerInKubelet(oldCfg, policy, scope, nil, 0)
updateKubeletConfig(f, newCfg, true)
// Run the tests // Run the tests
runTopologyManagerPolicySuiteTests(f) runTopologyManagerPolicySuiteTests(f)
} }
@ -923,7 +911,8 @@ func runTopologyManagerTests(f *framework.Framework) {
ginkgo.By(fmt.Sprintf("by configuring Topology Manager policy to %s", policy)) ginkgo.By(fmt.Sprintf("by configuring Topology Manager policy to %s", policy))
framework.Logf("Configuring topology Manager policy to %s", policy) framework.Logf("Configuring topology Manager policy to %s", policy)
reservedSystemCPUs := configureTopologyManagerInKubelet(f, oldCfg, policy, scope, configMap, numaNodes) newCfg, reservedSystemCPUs := configureTopologyManagerInKubelet(oldCfg, policy, scope, configMap, numaNodes)
updateKubeletConfig(f, newCfg, true)
runTopologyManagerNodeAlignmentSuiteTests(f, sd, reservedSystemCPUs, policy, numaNodes, coreCount) runTopologyManagerNodeAlignmentSuiteTests(f, sd, reservedSystemCPUs, policy, numaNodes, coreCount)
} }
@ -940,14 +929,15 @@ func runTopologyManagerTests(f *framework.Framework) {
policy := topologymanager.PolicySingleNumaNode policy := topologymanager.PolicySingleNumaNode
scope := podScopeTopology scope := podScopeTopology
reservedSystemCPUs := configureTopologyManagerInKubelet(f, oldCfg, policy, scope, configMap, numaNodes) newCfg, reservedSystemCPUs := configureTopologyManagerInKubelet(oldCfg, policy, scope, configMap, numaNodes)
updateKubeletConfig(f, newCfg, true)
runTMScopeResourceAlignmentTestSuite(f, configMap, reservedSystemCPUs, policy, numaNodes, coreCount) runTMScopeResourceAlignmentTestSuite(f, configMap, reservedSystemCPUs, policy, numaNodes, coreCount)
}) })
ginkgo.AfterEach(func() { ginkgo.AfterEach(func() {
// restore kubelet config // restore kubelet config
setOldKubeletConfig(f, oldCfg) updateKubeletConfig(f, oldCfg, true)
}) })
} }

View File

@ -50,10 +50,12 @@ import (
kubeletconfigcodec "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/codec" kubeletconfigcodec "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/codec"
kubeletmetrics "k8s.io/kubernetes/pkg/kubelet/metrics" kubeletmetrics "k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/util" "k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
e2ekubelet "k8s.io/kubernetes/test/e2e/framework/kubelet" e2ekubelet "k8s.io/kubernetes/test/e2e/framework/kubelet"
e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics" e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics"
e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2enodekubelet "k8s.io/kubernetes/test/e2e_node/kubeletconfig"
imageutils "k8s.io/kubernetes/test/utils/image" imageutils "k8s.io/kubernetes/test/utils/image"
"github.com/onsi/ginkgo" "github.com/onsi/ginkgo"
@ -73,6 +75,9 @@ const (
defaultPodResourcesMaxSize = 1024 * 1024 * 16 // 16 Mb defaultPodResourcesMaxSize = 1024 * 1024 * 16 // 16 Mb
kubeletReadOnlyPort = "10255" kubeletReadOnlyPort = "10255"
kubeletHealthCheckURL = "http://127.0.0.1:" + kubeletReadOnlyPort + "/healthz" kubeletHealthCheckURL = "http://127.0.0.1:" + kubeletReadOnlyPort + "/healthz"
// state files
cpuManagerStateFile = "/var/lib/kubelet/cpu_manager_state"
memoryManagerStateFile = "/var/lib/kubelet/memory_manager_state"
) )
func getNodeSummary() (*stats.Summary, error) { func getNodeSummary() (*stats.Summary, error) {
@ -156,30 +161,67 @@ func getCurrentKubeletConfig() (*kubeletconfig.KubeletConfiguration, error) {
// Returns true on success. // Returns true on success.
func tempSetCurrentKubeletConfig(f *framework.Framework, updateFunction func(initialConfig *kubeletconfig.KubeletConfiguration)) { func tempSetCurrentKubeletConfig(f *framework.Framework, updateFunction func(initialConfig *kubeletconfig.KubeletConfiguration)) {
var oldCfg *kubeletconfig.KubeletConfiguration var oldCfg *kubeletconfig.KubeletConfiguration
ginkgo.BeforeEach(func() { ginkgo.BeforeEach(func() {
configEnabled, err := isKubeletConfigEnabled(f) oldCfg, err := getCurrentKubeletConfig()
framework.ExpectNoError(err)
framework.ExpectEqual(configEnabled, true, "The Dynamic Kubelet Configuration feature is not enabled.\n"+
"Pass --feature-gates=DynamicKubeletConfig=true to the Kubelet to enable this feature.\n"+
"For `make test-e2e-node`, you can set `TEST_ARGS='--feature-gates=DynamicKubeletConfig=true'`.")
oldCfg, err = getCurrentKubeletConfig()
framework.ExpectNoError(err) framework.ExpectNoError(err)
newCfg := oldCfg.DeepCopy() newCfg := oldCfg.DeepCopy()
updateFunction(newCfg) updateFunction(newCfg)
if apiequality.Semantic.DeepEqual(*newCfg, *oldCfg) { if apiequality.Semantic.DeepEqual(*newCfg, *oldCfg) {
return return
} }
framework.ExpectNoError(setKubeletConfiguration(f, newCfg)) updateKubeletConfig(f, newCfg, true)
}) })
ginkgo.AfterEach(func() { ginkgo.AfterEach(func() {
if oldCfg != nil { if oldCfg != nil {
err := setKubeletConfiguration(f, oldCfg) // Update the Kubelet configuration.
framework.ExpectNoError(err) updateKubeletConfig(f, oldCfg, true)
} }
}) })
} }
func updateKubeletConfig(f *framework.Framework, kubeletConfig *kubeletconfig.KubeletConfiguration, deleteStateFiles bool) {
// Update the Kubelet configuration.
ginkgo.By("Stopping the kubelet")
startKubelet := stopKubelet()
// wait until the kubelet health check will fail
gomega.Eventually(func() bool {
return kubeletHealthCheck(kubeletHealthCheckURL)
}, time.Minute, time.Second).Should(gomega.BeFalse())
// Delete CPU and memory manager state files to be sure it will not prevent the kubelet restart
if deleteStateFiles {
deleteStateFile(cpuManagerStateFile)
deleteStateFile(memoryManagerStateFile)
}
framework.ExpectNoError(e2enodekubelet.WriteKubeletConfigFile(kubeletConfig))
ginkgo.By("Starting the kubelet")
startKubelet()
// wait until the kubelet health check will succeed
gomega.Eventually(func() bool {
return kubeletHealthCheck(kubeletHealthCheckURL)
}, 2*time.Minute, 5*time.Second).Should(gomega.BeTrue())
// Wait for the Kubelet to be ready.
gomega.Eventually(func() bool {
nodes, err := e2enode.TotalReady(f.ClientSet)
framework.ExpectNoError(err)
return nodes == 1
}, time.Minute, time.Second).Should(gomega.BeTrue())
}
func deleteStateFile(stateFileName string) {
err := exec.Command("/bin/sh", "-c", fmt.Sprintf("rm -f %s", stateFileName)).Run()
framework.ExpectNoError(err, "failed to delete the state file")
}
// Returns true if kubeletConfig is enabled, false otherwise or if we cannot determine if it is. // Returns true if kubeletConfig is enabled, false otherwise or if we cannot determine if it is.
func isKubeletConfigEnabled(f *framework.Framework) (bool, error) { func isKubeletConfigEnabled(f *framework.Framework) (bool, error) {
cfgz, err := getCurrentKubeletConfig() cfgz, err := getCurrentKubeletConfig()
@ -193,64 +235,6 @@ func isKubeletConfigEnabled(f *framework.Framework) (bool, error) {
return v, nil return v, nil
} }
// Creates or updates the configmap for KubeletConfiguration, waits for the Kubelet to restart
// with the new configuration. Returns an error if the configuration after waiting for restartGap
// doesn't match what you attempted to set, or if the dynamic configuration feature is disabled.
// You should only call this from serial tests.
func setKubeletConfiguration(f *framework.Framework, kubeCfg *kubeletconfig.KubeletConfiguration) error {
const (
restartGap = 40 * time.Second
pollInterval = 5 * time.Second
)
// make sure Dynamic Kubelet Configuration feature is enabled on the Kubelet we are about to reconfigure
if configEnabled, err := isKubeletConfigEnabled(f); err != nil {
return err
} else if !configEnabled {
return fmt.Errorf("The Dynamic Kubelet Configuration feature is not enabled.\n" +
"Pass --feature-gates=DynamicKubeletConfig=true to the Kubelet to enable this feature.\n" +
"For `make test-e2e-node`, you can set `TEST_ARGS='--feature-gates=DynamicKubeletConfig=true'`.")
}
// create the ConfigMap with the new configuration
cm, err := createConfigMap(f, kubeCfg)
if err != nil {
return err
}
// create the reference and set Node.Spec.ConfigSource
src := &v1.NodeConfigSource{
ConfigMap: &v1.ConfigMapNodeConfigSource{
Namespace: "kube-system",
Name: cm.Name,
KubeletConfigKey: "kubelet",
},
}
// set the source, retry a few times in case we are competing with other writers
gomega.Eventually(func() error {
if err := setNodeConfigSource(f, src); err != nil {
return err
}
return nil
}, time.Minute, time.Second).Should(gomega.BeNil())
// poll for new config, for a maximum wait of restartGap
gomega.Eventually(func() error {
newKubeCfg, err := getCurrentKubeletConfig()
if err != nil {
return fmt.Errorf("failed trying to get current Kubelet config, will retry, error: %v", err)
}
if !apiequality.Semantic.DeepEqual(*kubeCfg, *newKubeCfg) {
return fmt.Errorf("still waiting for new configuration to take effect, will continue to watch /configz")
}
klog.Infof("new configuration has taken effect")
return nil
}, restartGap, pollInterval).Should(gomega.BeNil())
return nil
}
// sets the current node's configSource, this should only be called from Serial tests // sets the current node's configSource, this should only be called from Serial tests
func setNodeConfigSource(f *framework.Framework, source *v1.NodeConfigSource) error { func setNodeConfigSource(f *framework.Framework, source *v1.NodeConfigSource) error {
// since this is a serial test, we just get the node, change the source, and then update it // since this is a serial test, we just get the node, change the source, and then update it
@ -275,16 +259,6 @@ func setNodeConfigSource(f *framework.Framework, source *v1.NodeConfigSource) er
return nil return nil
} }
// creates a configmap containing kubeCfg in kube-system namespace
func createConfigMap(f *framework.Framework, internalKC *kubeletconfig.KubeletConfiguration) (*v1.ConfigMap, error) {
cmap := newKubeletConfigMap("testcfg", internalKC)
cmap, err := f.ClientSet.CoreV1().ConfigMaps("kube-system").Create(context.TODO(), cmap, metav1.CreateOptions{})
if err != nil {
return nil, err
}
return cmap, nil
}
// constructs a ConfigMap, populating one of its keys with the KubeletConfiguration. Always uses GenerateName to generate a suffix. // constructs a ConfigMap, populating one of its keys with the KubeletConfiguration. Always uses GenerateName to generate a suffix.
func newKubeletConfigMap(name string, internalKC *kubeletconfig.KubeletConfiguration) *v1.ConfigMap { func newKubeletConfigMap(name string, internalKC *kubeletconfig.KubeletConfiguration) *v1.ConfigMap {
data, err := kubeletconfigcodec.EncodeKubeletConfig(internalKC, kubeletconfigv1beta1.SchemeGroupVersion) data, err := kubeletconfigcodec.EncodeKubeletConfig(internalKC, kubeletconfigv1beta1.SchemeGroupVersion)
@ -447,8 +421,9 @@ func stopKubelet() func() {
framework.ExpectNoError(err, "Failed to stop kubelet with systemctl: %v, %s", err, string(stdout)) framework.ExpectNoError(err, "Failed to stop kubelet with systemctl: %v, %s", err, string(stdout))
return func() { return func() {
stdout, err := exec.Command("sudo", "systemctl", "start", kubeletServiceName).CombinedOutput() // we should restart service, otherwise the transient service start will fail
framework.ExpectNoError(err, "Failed to restart kubelet with systemctl: %v, %s", err, string(stdout)) stdout, err := exec.Command("sudo", "systemctl", "restart", kubeletServiceName).CombinedOutput()
framework.ExpectNoError(err, "Failed to restart kubelet with systemctl: %v, %v", err, stdout)
} }
} }