Merge pull request #102882 from fromanirh/device-manager-checkpoints

devicemanager: checkpoint: support pre-1.20 data
This commit is contained in:
Kubernetes Prow Robot 2021-11-02 16:56:57 -07:00 committed by GitHub
commit 359b722c19
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 605 additions and 20 deletions

View File

@ -27,7 +27,7 @@ import (
// DeviceManagerCheckpoint defines the operations to retrieve pod devices
type DeviceManagerCheckpoint interface {
checkpointmanager.Checkpoint
GetData() ([]PodDevicesEntry, map[string][]string)
GetDataInLatestFormat() ([]PodDevicesEntry, map[string][]string)
}
// DevicesPerNUMA represents device ids obtained from device plugin per NUMA node id
@ -72,9 +72,12 @@ func (dev DevicesPerNUMA) Devices() sets.String {
return result
}
// New returns an instance of Checkpoint
func New(devEntries []PodDevicesEntry,
devices map[string][]string) DeviceManagerCheckpoint {
// New returns an instance of Checkpoint - must be an alias for the most recent version
func New(devEntries []PodDevicesEntry, devices map[string][]string) DeviceManagerCheckpoint {
return NewV2(devEntries, devices)
}
func NewV2(devEntries []PodDevicesEntry, devices map[string][]string) DeviceManagerCheckpoint {
return &Data{
Data: checkpointData{
PodDeviceEntries: devEntries,
@ -99,7 +102,8 @@ func (cp *Data) VerifyChecksum() error {
return cp.Checksum.Verify(cp.Data)
}
// GetData returns device entries and registered devices
func (cp *Data) GetData() ([]PodDevicesEntry, map[string][]string) {
// GetDataInLatestFormat returns device entries and registered devices in the *most recent*
// checkpoint format, *not* in the original format stored on disk.
func (cp *Data) GetDataInLatestFormat() ([]PodDevicesEntry, map[string][]string) {
return cp.Data.PodDeviceEntries, cp.Data.RegisteredDevices
}

View File

@ -0,0 +1,124 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package checkpoint
import (
"encoding/json"
"hash/fnv"
"strings"
"github.com/davecgh/go-spew/spew"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
)
// PodDevicesEntry connects pod information to devices, without topology information (k8s <= 1.19)
type PodDevicesEntryV1 struct {
PodUID string
ContainerName string
ResourceName string
DeviceIDs []string
AllocResp []byte
}
// checkpointData struct is used to store pod to device allocation information
// in a checkpoint file, without topology information (k8s <= 1.19)
type checkpointDataV1 struct {
PodDeviceEntries []PodDevicesEntryV1
RegisteredDevices map[string][]string
}
// checksum compute the checksum using the same algorithms (and data type names) k8s 1.19 used.
// We need this special code path to be able to correctly validate the checksum k8s 1.19 wrote.
// credits to https://github.com/kubernetes/kubernetes/pull/102717/commits/353f93895118d2ffa2d59a29a1fbc225160ea1d6
func (cp checkpointDataV1) checksum() checksum.Checksum {
printer := spew.ConfigState{
Indent: " ",
SortKeys: true,
DisableMethods: true,
SpewKeys: true,
}
object := printer.Sprintf("%#v", cp)
object = strings.Replace(object, "checkpointDataV1", "checkpointData", 1)
object = strings.Replace(object, "PodDevicesEntryV1", "PodDevicesEntry", -1)
hash := fnv.New32a()
printer.Fprintf(hash, "%v", object)
return checksum.Checksum(hash.Sum32())
}
// Data holds checkpoint data and its checksum, in V1 (k8s <= 1.19) format
type DataV1 struct {
Data checkpointDataV1
Checksum checksum.Checksum
}
// New returns an instance of Checkpoint, in V1 (k8s <= 1.19) format.
// Users should avoid creating checkpoints in formats different than the most recent one,
// use the old formats only to validate existing checkpoint and convert them to most recent
// format. The only exception should be test code.
func NewV1(devEntries []PodDevicesEntryV1,
devices map[string][]string) DeviceManagerCheckpoint {
return &DataV1{
Data: checkpointDataV1{
PodDeviceEntries: devEntries,
RegisteredDevices: devices,
},
}
}
// MarshalCheckpoint is needed to implement the Checkpoint interface, but should not be called anymore
func (cp *DataV1) MarshalCheckpoint() ([]byte, error) {
klog.InfoS("Marshalling a device manager V1 checkpoint")
cp.Checksum = cp.Data.checksum()
return json.Marshal(*cp)
}
// MarshalCheckpoint returns marshalled data
func (cp *DataV1) UnmarshalCheckpoint(blob []byte) error {
return json.Unmarshal(blob, cp)
}
// VerifyChecksum verifies that passed checksum is same as calculated checksum
func (cp *DataV1) VerifyChecksum() error {
if cp.Checksum != cp.Data.checksum() {
return errors.ErrCorruptCheckpoint
}
return nil
}
// GetDataInLatestFormat returns device entries and registered devices in the *most recent*
// checkpoint format, *not* in the original format stored on disk.
func (cp *DataV1) GetDataInLatestFormat() ([]PodDevicesEntry, map[string][]string) {
var podDevs []PodDevicesEntry
for _, entryV1 := range cp.Data.PodDeviceEntries {
devsPerNuma := NewDevicesPerNUMA()
// no NUMA cell affinity was recorded. The only possible choice
// is to set all the devices affine to node 0.
devsPerNuma[0] = entryV1.DeviceIDs
podDevs = append(podDevs, PodDevicesEntry{
PodUID: entryV1.PodUID,
ContainerName: entryV1.ContainerName,
ResourceName: entryV1.ResourceName,
DeviceIDs: devsPerNuma,
AllocResp: entryV1.AllocResp,
})
}
return podDevs, cp.Data.RegisteredDevices
}

View File

@ -599,20 +599,33 @@ func (m *ManagerImpl) writeCheckpoint() error {
// Reads device to container allocation information from disk, and populates
// m.allocatedDevices accordingly.
func (m *ManagerImpl) readCheckpoint() error {
registeredDevs := make(map[string][]string)
devEntries := make([]checkpoint.PodDevicesEntry, 0)
cp := checkpoint.New(devEntries, registeredDevs)
err := m.checkpointManager.GetCheckpoint(kubeletDeviceManagerCheckpoint, cp)
// the vast majority of time we restore a compatible checkpoint, so we try
// the current version first. Trying to restore older format checkpoints is
// relevant only in the kubelet upgrade flow, which happens once in a
// (long) while.
cp, err := m.getCheckpointV2()
if err != nil {
if err == errors.ErrCheckpointNotFound {
klog.InfoS("Failed to retrieve checkpoint", "checkpoint", kubeletDeviceManagerCheckpoint, "err", err)
// no point in trying anything else
klog.InfoS("Failed to read data from checkpoint", "checkpoint", kubeletDeviceManagerCheckpoint, "err", err)
return nil
}
return err
var errv1 error
// one last try: maybe it's a old format checkpoint?
cp, errv1 = m.getCheckpointV1()
if errv1 != nil {
klog.InfoS("Failed to read checkpoint V1 file", "err", errv1)
// intentionally return the parent error. We expect to restore V1 checkpoints
// a tiny fraction of time, so what matters most is the current checkpoint read error.
return err
}
klog.InfoS("Read data from a V1 checkpoint", "checkpoint", kubeletDeviceManagerCheckpoint)
}
m.mutex.Lock()
defer m.mutex.Unlock()
podDevices, registeredDevs := cp.GetData()
podDevices, registeredDevs := cp.GetDataInLatestFormat()
m.podDevices.fromCheckpointData(podDevices)
m.allocatedDevices = m.podDevices.devices()
for resource := range registeredDevs {
@ -625,6 +638,22 @@ func (m *ManagerImpl) readCheckpoint() error {
return nil
}
func (m *ManagerImpl) getCheckpointV2() (checkpoint.DeviceManagerCheckpoint, error) {
registeredDevs := make(map[string][]string)
devEntries := make([]checkpoint.PodDevicesEntry, 0)
cp := checkpoint.New(devEntries, registeredDevs)
err := m.checkpointManager.GetCheckpoint(kubeletDeviceManagerCheckpoint, cp)
return cp, err
}
func (m *ManagerImpl) getCheckpointV1() (checkpoint.DeviceManagerCheckpoint, error) {
registeredDevs := make(map[string][]string)
devEntries := make([]checkpoint.PodDevicesEntryV1, 0)
cp := checkpoint.NewV1(devEntries, registeredDevs)
err := m.checkpointManager.GetCheckpoint(kubeletDeviceManagerCheckpoint, cp)
return cp, err
}
// UpdateAllocatedDevices frees any Devices that are bound to terminated pods.
func (m *ManagerImpl) UpdateAllocatedDevices() {
if !m.sourcesReady.AllReady() {

View File

@ -25,6 +25,7 @@ import (
"testing"
"time"
cadvisorapi "github.com/google/cadvisor/info/v1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
@ -1288,3 +1289,25 @@ func makeDevice(devOnNUMA checkpoint.DevicesPerNUMA, topology bool) map[string]p
}
return res
}
const deviceManagerCheckpointFilename = "kubelet_internal_checkpoint"
var oldCheckpoint string = `{"Data":{"PodDeviceEntries":[{"PodUID":"13ac2284-0d19-44b7-b94f-055b032dba9b","ContainerName":"centos","ResourceName":"example.com/deviceA","DeviceIDs":["DevA3"],"AllocResp":"CiIKHUVYQU1QTEVDT01ERVZJQ0VBX0RFVkEzX1RUWTEwEgEwGhwKCi9kZXYvdHR5MTASCi9kZXYvdHR5MTAaAnJ3"},{"PodUID":"86b9a017-c9ca-4069-815f-46ca3e53c1e4","ContainerName":"centos","ResourceName":"example.com/deviceA","DeviceIDs":["DevA4"],"AllocResp":"CiIKHUVYQU1QTEVDT01ERVZJQ0VBX0RFVkE0X1RUWTExEgEwGhwKCi9kZXYvdHR5MTESCi9kZXYvdHR5MTEaAnJ3"}],"RegisteredDevices":{"example.com/deviceA":["DevA1","DevA2","DevA3","DevA4"]}},"Checksum":405612085}`
func TestReadPreNUMACheckpoint(t *testing.T) {
socketDir, socketName, _, err := tmpSocketDir()
require.NoError(t, err)
defer os.RemoveAll(socketDir)
err = ioutil.WriteFile(filepath.Join(socketDir, deviceManagerCheckpointFilename), []byte(oldCheckpoint), 0644)
require.NoError(t, err)
topologyStore := topologymanager.NewFakeManager()
nodes := []cadvisorapi.Node{{Id: 0}}
m, err := newManagerImpl(socketName, nodes, topologyStore)
require.NoError(t, err)
// TODO: we should not calling private methods, but among the existing tests we do anyway
err = m.readCheckpoint()
require.NoError(t, err)
}

View File

@ -0,0 +1,370 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2enode
import (
"context"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"regexp"
"sort"
"strings"
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeletpodresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
"k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/test/e2e/framework"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
"github.com/onsi/ginkgo"
"github.com/onsi/gomega"
)
const (
devicePluginDir = "/var/lib/kubelet/device-plugins"
checkpointName = "kubelet_internal_checkpoint"
)
// Serial because the test updates kubelet configuration.
var _ = SIGDescribe("Device Manager [Serial] [Feature:DeviceManager][NodeFeature:DeviceManager]", func() {
checkpointFullPath := filepath.Join(devicePluginDir, checkpointName)
f := framework.NewDefaultFramework("devicemanager-test")
ginkgo.Context("With SRIOV devices in the system", func() {
// this test wants to reproduce what happened in https://github.com/kubernetes/kubernetes/issues/102880
ginkgo.It("should be able to recover V1 (aka pre-1.20) checkpoint data and reject pods before device re-registration", func() {
if sriovdevCount, err := countSRIOVDevices(); err != nil || sriovdevCount == 0 {
e2eskipper.Skipf("this test is meant to run on a system with at least one configured VF from SRIOV device")
}
oldCfg := enablePodResourcesFeatureGateInKubelet(f)
defer func() {
// restore kubelet config
setOldKubeletConfig(f, oldCfg)
// Delete state file to allow repeated runs
deleteStateFile()
}()
configMap := getSRIOVDevicePluginConfigMap(framework.TestContext.SriovdpConfigMapFile)
sd := setupSRIOVConfigOrFail(f, configMap)
waitForSRIOVResources(f, sd)
cntName := "gu-container"
// we create and delete a pod to make sure the internal device manager state contains a pod allocation
ginkgo.By(fmt.Sprintf("Successfully admit one guaranteed pod with 1 core, 1 %s device", sd.resourceName))
var initCtnAttrs []tmCtnAttribute
ctnAttrs := []tmCtnAttribute{
{
ctnName: cntName,
cpuRequest: "1000m",
cpuLimit: "1000m",
deviceName: sd.resourceName,
deviceRequest: "1",
deviceLimit: "1",
},
}
podName := "gu-pod-rec-pre-1"
framework.Logf("creating pod %s attrs %v", podName, ctnAttrs)
pod := makeTopologyManagerTestPod(podName, ctnAttrs, initCtnAttrs)
pod = f.PodClient().CreateSync(pod)
// now we need to simulate a node drain, so we remove all the pods, including the sriov device plugin.
ginkgo.By("deleting the pod")
// note we delete right now because we know the current implementation of devicemanager will NOT
// clean up on pod deletion. When this changes, the deletion needs to be done after the test is done.
deletePodSyncByName(f, pod.Name)
waitForAllContainerRemoval(pod.Name, pod.Namespace)
ginkgo.By("teardown the sriov device plugin")
// since we will NOT be recreating the plugin, we clean up everything now
teardownSRIOVConfigOrFail(f, sd)
ginkgo.By("stopping the kubelet")
killKubelet("SIGSTOP")
ginkgo.By("rewriting the kubelet checkpoint file as v1")
err := rewriteCheckpointAsV1(devicePluginDir, checkpointName)
// make sure we remove any leftovers
defer os.Remove(checkpointFullPath)
framework.ExpectNoError(err)
// this mimics a kubelet restart after the upgrade
// TODO: is SIGTERM (less brutal) good enough?
ginkgo.By("killing the kubelet")
killKubelet("SIGKILL")
ginkgo.By("waiting for the kubelet to be ready again")
// Wait for the Kubelet to be ready.
gomega.Eventually(func() bool {
nodes, err := e2enode.TotalReady(f.ClientSet)
framework.ExpectNoError(err)
return nodes == 1
}, time.Minute, time.Second).Should(gomega.BeTrue())
// note we DO NOT start the sriov device plugin. This is intentional.
// issue#102880 reproduces because of a race on startup caused by corrupted device manager
// state which leads to v1.Node object not updated on apiserver.
// So to hit the issue we need to receive the pod *before* the device plugin registers itself.
// The simplest and safest way to reproduce is just avoid to run the device plugin again
podName = "gu-pod-rec-post-2"
framework.Logf("creating pod %s attrs %v", podName, ctnAttrs)
pod = makeTopologyManagerTestPod(podName, ctnAttrs, initCtnAttrs)
pod = f.PodClient().Create(pod)
err = e2epod.WaitForPodCondition(f.ClientSet, f.Namespace.Name, pod.Name, "Failed", 30*time.Second, func(pod *v1.Pod) (bool, error) {
if pod.Status.Phase != v1.PodPending {
return true, nil
}
return false, nil
})
framework.ExpectNoError(err)
pod, err = f.PodClient().Get(context.TODO(), pod.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
if pod.Status.Phase != v1.PodFailed {
framework.Failf("pod %s not failed: %v", pod.Name, pod.Status)
}
framework.Logf("checking pod %s status reason (%s)", pod.Name, pod.Status.Reason)
if !isUnexpectedAdmissionError(pod) {
framework.Failf("pod %s failed for wrong reason: %q", pod.Name, pod.Status.Reason)
}
deletePodSyncByName(f, pod.Name)
})
ginkgo.It("should be able to recover V1 (aka pre-1.20) checkpoint data and update topology info on device re-registration", func() {
if sriovdevCount, err := countSRIOVDevices(); err != nil || sriovdevCount == 0 {
e2eskipper.Skipf("this test is meant to run on a system with at least one configured VF from SRIOV device")
}
oldCfg := enablePodResourcesFeatureGateInKubelet(f)
defer func() {
// restore kubelet config
setOldKubeletConfig(f, oldCfg)
// Delete state file to allow repeated runs
deleteStateFile()
}()
endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket)
framework.ExpectNoError(err)
configMap := getSRIOVDevicePluginConfigMap(framework.TestContext.SriovdpConfigMapFile)
sd := setupSRIOVConfigOrFail(f, configMap)
waitForSRIOVResources(f, sd)
cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
framework.ExpectNoError(err)
resp, err := cli.GetAllocatableResources(context.TODO(), &kubeletpodresourcesv1.AllocatableResourcesRequest{})
conn.Close()
framework.ExpectNoError(err)
suitableDevs := 0
for _, dev := range resp.GetDevices() {
for _, node := range dev.GetTopology().GetNodes() {
if node.GetID() != 0 {
suitableDevs++
}
}
}
if suitableDevs == 0 {
teardownSRIOVConfigOrFail(f, sd)
e2eskipper.Skipf("no devices found on NUMA Cell other than 0")
}
cntName := "gu-container"
// we create and delete a pod to make sure the internal device manager state contains a pod allocation
ginkgo.By(fmt.Sprintf("Successfully admit one guaranteed pod with 1 core, 1 %s device", sd.resourceName))
var initCtnAttrs []tmCtnAttribute
ctnAttrs := []tmCtnAttribute{
{
ctnName: cntName,
cpuRequest: "1000m",
cpuLimit: "1000m",
deviceName: sd.resourceName,
deviceRequest: "1",
deviceLimit: "1",
},
}
podName := "gu-pod-rec-pre-1"
framework.Logf("creating pod %s attrs %v", podName, ctnAttrs)
pod := makeTopologyManagerTestPod(podName, ctnAttrs, initCtnAttrs)
pod = f.PodClient().CreateSync(pod)
// now we need to simulate a node drain, so we remove all the pods, including the sriov device plugin.
ginkgo.By("deleting the pod")
// note we delete right now because we know the current implementation of devicemanager will NOT
// clean up on pod deletion. When this changes, the deletion needs to be done after the test is done.
deletePodSyncByName(f, pod.Name)
waitForAllContainerRemoval(pod.Name, pod.Namespace)
ginkgo.By("teardown the sriov device plugin")
// no need to delete the config now (speed up later)
deleteSRIOVPodOrFail(f, sd)
ginkgo.By("stopping the kubelet")
killKubelet("SIGSTOP")
ginkgo.By("rewriting the kubelet checkpoint file as v1")
err = rewriteCheckpointAsV1(devicePluginDir, checkpointName)
// make sure we remove any leftovers
defer os.Remove(checkpointFullPath)
framework.ExpectNoError(err)
// this mimics a kubelet restart after the upgrade
// TODO: is SIGTERM (less brutal) good enough?
ginkgo.By("killing the kubelet")
killKubelet("SIGKILL")
ginkgo.By("waiting for the kubelet to be ready again")
// Wait for the Kubelet to be ready.
gomega.Eventually(func() bool {
nodes, err := e2enode.TotalReady(f.ClientSet)
framework.ExpectNoError(err)
return nodes == 1
}, time.Minute, time.Second).Should(gomega.BeTrue())
sd2 := &sriovData{
configMap: sd.configMap,
serviceAccount: sd.serviceAccount,
}
sd2.pod = createSRIOVPodOrFail(f)
defer teardownSRIOVConfigOrFail(f, sd2)
waitForSRIOVResources(f, sd2)
compareSRIOVResources(sd, sd2)
cli, conn, err = podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
framework.ExpectNoError(err)
defer conn.Close()
resp2, err := cli.GetAllocatableResources(context.TODO(), &kubeletpodresourcesv1.AllocatableResourcesRequest{})
framework.ExpectNoError(err)
cntDevs := stringifyContainerDevices(resp.GetDevices())
cntDevs2 := stringifyContainerDevices(resp2.GetDevices())
if cntDevs != cntDevs2 {
framework.Failf("different allocatable resources expected %v got %v", cntDevs, cntDevs2)
}
})
})
})
func compareSRIOVResources(expected, got *sriovData) {
if expected.resourceName != got.resourceName {
framework.Failf("different SRIOV resource name: expected %q got %q", expected.resourceName, got.resourceName)
}
if expected.resourceAmount != got.resourceAmount {
framework.Failf("different SRIOV resource amount: expected %d got %d", expected.resourceAmount, got.resourceAmount)
}
}
func isUnexpectedAdmissionError(pod *v1.Pod) bool {
re := regexp.MustCompile(`Unexpected.*Admission.*Error`)
return re.MatchString(pod.Status.Reason)
}
func rewriteCheckpointAsV1(dir, name string) error {
ginkgo.By(fmt.Sprintf("Creating temporary checkpoint manager (dir=%q)", dir))
checkpointManager, err := checkpointmanager.NewCheckpointManager(dir)
if err != nil {
return err
}
cp := checkpoint.New(make([]checkpoint.PodDevicesEntry, 0), make(map[string][]string))
err = checkpointManager.GetCheckpoint(name, cp)
if err != nil {
return err
}
ginkgo.By(fmt.Sprintf("Read checkpoint %q %#v", name, cp))
podDevices, registeredDevs := cp.GetDataInLatestFormat()
podDevicesV1 := convertPodDeviceEntriesToV1(podDevices)
cpV1 := checkpoint.NewV1(podDevicesV1, registeredDevs)
blob, err := cpV1.MarshalCheckpoint()
if err != nil {
return err
}
// TODO: why `checkpointManager.CreateCheckpoint(name, cpV1)` doesn't seem to work?
ckPath := filepath.Join(dir, name)
ioutil.WriteFile(filepath.Join("/tmp", name), blob, 0600)
return ioutil.WriteFile(ckPath, blob, 0600)
}
func convertPodDeviceEntriesToV1(entries []checkpoint.PodDevicesEntry) []checkpoint.PodDevicesEntryV1 {
entriesv1 := []checkpoint.PodDevicesEntryV1{}
for _, entry := range entries {
deviceIDs := []string{}
for _, perNUMANodeDevIDs := range entry.DeviceIDs {
deviceIDs = append(deviceIDs, perNUMANodeDevIDs...)
}
entriesv1 = append(entriesv1, checkpoint.PodDevicesEntryV1{
PodUID: entry.PodUID,
ContainerName: entry.ContainerName,
ResourceName: entry.ResourceName,
DeviceIDs: deviceIDs,
AllocResp: entry.AllocResp,
})
}
return entriesv1
}
func stringifyContainerDevices(devs []*kubeletpodresourcesv1.ContainerDevices) string {
entries := []string{}
for _, dev := range devs {
devIDs := dev.GetDeviceIds()
if devIDs != nil {
for _, devID := range dev.DeviceIds {
nodes := dev.GetTopology().GetNodes()
if nodes != nil {
for _, node := range nodes {
entries = append(entries, fmt.Sprintf("%s[%s]@NUMA=%d", dev.ResourceName, devID, node.GetID()))
}
} else {
entries = append(entries, fmt.Sprintf("%s[%s]@NUMA=none", dev.ResourceName, devID))
}
}
} else {
entries = append(entries, dev.ResourceName)
}
}
sort.Strings(entries)
return strings.Join(entries, ", ")
}

View File

@ -505,6 +505,15 @@ type sriovData struct {
}
func setupSRIOVConfigOrFail(f *framework.Framework, configMap *v1.ConfigMap) *sriovData {
sd := createSRIOVConfigOrFail(f, configMap)
e2enode.WaitForNodeToBeReady(f.ClientSet, framework.TestContext.NodeName, 5*time.Minute)
sd.pod = createSRIOVPodOrFail(f)
return sd
}
func createSRIOVConfigOrFail(f *framework.Framework, configMap *v1.ConfigMap) *sriovData {
var err error
ginkgo.By(fmt.Sprintf("Creating configMap %v/%v", metav1.NamespaceSystem, configMap.Name))
@ -522,8 +531,13 @@ func setupSRIOVConfigOrFail(f *framework.Framework, configMap *v1.ConfigMap) *sr
framework.Failf("unable to create test serviceAccount %s: %v", serviceAccount.Name, err)
}
e2enode.WaitForNodeToBeReady(f.ClientSet, framework.TestContext.NodeName, 5*time.Minute)
return &sriovData{
configMap: configMap,
serviceAccount: serviceAccount,
}
}
func createSRIOVPodOrFail(f *framework.Framework) *v1.Pod {
dp := getSRIOVDevicePluginPod()
dp.Spec.NodeName = framework.TestContext.NodeName
@ -536,11 +550,7 @@ func setupSRIOVConfigOrFail(f *framework.Framework, configMap *v1.ConfigMap) *sr
}
framework.ExpectNoError(err)
return &sriovData{
configMap: configMap,
serviceAccount: serviceAccount,
pod: dpPod,
}
return dpPod
}
// waitForSRIOVResources waits until enough SRIOV resources are avaailable, expecting to complete within the timeout.
@ -560,7 +570,7 @@ func waitForSRIOVResources(f *framework.Framework, sd *sriovData) {
framework.Logf("Detected SRIOV allocatable devices name=%q amount=%d", sd.resourceName, sd.resourceAmount)
}
func teardownSRIOVConfigOrFail(f *framework.Framework, sd *sriovData) {
func deleteSRIOVPodOrFail(f *framework.Framework, sd *sriovData) {
var err error
gp := int64(0)
deleteOptions := metav1.DeleteOptions{
@ -571,6 +581,14 @@ func teardownSRIOVConfigOrFail(f *framework.Framework, sd *sriovData) {
err = f.ClientSet.CoreV1().Pods(sd.pod.Namespace).Delete(context.TODO(), sd.pod.Name, deleteOptions)
framework.ExpectNoError(err)
waitForAllContainerRemoval(sd.pod.Name, sd.pod.Namespace)
}
func removeSRIOVConfigOrFail(f *framework.Framework, sd *sriovData) {
var err error
gp := int64(0)
deleteOptions := metav1.DeleteOptions{
GracePeriodSeconds: &gp,
}
ginkgo.By(fmt.Sprintf("Deleting configMap %v/%v", metav1.NamespaceSystem, sd.configMap.Name))
err = f.ClientSet.CoreV1().ConfigMaps(metav1.NamespaceSystem).Delete(context.TODO(), sd.configMap.Name, deleteOptions)
@ -581,6 +599,11 @@ func teardownSRIOVConfigOrFail(f *framework.Framework, sd *sriovData) {
framework.ExpectNoError(err)
}
func teardownSRIOVConfigOrFail(f *framework.Framework, sd *sriovData) {
deleteSRIOVPodOrFail(f, sd)
removeSRIOVConfigOrFail(f, sd)
}
func runTMScopeResourceAlignmentTestSuite(f *framework.Framework, configMap *v1.ConfigMap, reservedSystemCPUs, policy string, numaNodes, coreCount int) {
threadsPerCore := getSMTLevel()
sd := setupSRIOVConfigOrFail(f, configMap)

View File

@ -452,6 +452,18 @@ func stopKubelet() func() {
}
}
// killKubelet sends a signal (SIGINT, SIGSTOP, SIGTERM...) to the running kubelet
func killKubelet(sig string) {
kubeletServiceName := findKubeletServiceName(true)
// reset the kubelet service start-limit-hit
stdout, err := exec.Command("sudo", "systemctl", "reset-failed", kubeletServiceName).CombinedOutput()
framework.ExpectNoError(err, "Failed to reset kubelet start-limit-hit with systemctl: %v, %v", err, stdout)
stdout, err = exec.Command("sudo", "systemctl", "kill", "-s", sig, kubeletServiceName).CombinedOutput()
framework.ExpectNoError(err, "Failed to stop kubelet with systemctl: %v, %v", err, stdout)
}
func kubeletHealthCheck(url string) bool {
insecureTransport := http.DefaultTransport.(*http.Transport).Clone()
insecureTransport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}