node: devicemgr: remove obsolete pre-1.20 checkpoint file support

In commit 2f426fdba6 we added
compatibility (and tests) to deal with pre-1.20 checkpoint files.
We are now well past the end of support for pre-1.20 kubelets,
so we can get rid of this code.

Signed-off-by: Francesco Romani <fromani@redhat.com>
This commit is contained in:
Francesco Romani 2024-02-20 17:40:23 +01:00
parent 95a6f2e4dc
commit 181fb0da51
6 changed files with 7 additions and 486 deletions

View File

@ -27,7 +27,7 @@ import (
// DeviceManagerCheckpoint defines the operations to retrieve pod devices // DeviceManagerCheckpoint defines the operations to retrieve pod devices
type DeviceManagerCheckpoint interface { type DeviceManagerCheckpoint interface {
checkpointmanager.Checkpoint checkpointmanager.Checkpoint
GetDataInLatestFormat() ([]PodDevicesEntry, map[string][]string) GetData() ([]PodDevicesEntry, map[string][]string)
} }
// DevicesPerNUMA represents device ids obtained from device plugin per NUMA node id // DevicesPerNUMA represents device ids obtained from device plugin per NUMA node id
@ -102,8 +102,8 @@ func (cp *Data) VerifyChecksum() error {
return cp.Checksum.Verify(cp.Data) return cp.Checksum.Verify(cp.Data)
} }
// GetDataInLatestFormat returns device entries and registered devices in the *most recent* // GetData returns device entries and registered devices in the *most recent*
// checkpoint format, *not* in the original format stored on disk. // checkpoint format, *not* in the original format stored on disk.
func (cp *Data) GetDataInLatestFormat() ([]PodDevicesEntry, map[string][]string) { func (cp *Data) GetData() ([]PodDevicesEntry, map[string][]string) {
return cp.Data.PodDeviceEntries, cp.Data.RegisteredDevices return cp.Data.PodDeviceEntries, cp.Data.RegisteredDevices
} }

View File

@ -1,117 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package checkpoint
import (
"encoding/json"
"fmt"
"hash/fnv"
"strings"
"k8s.io/apimachinery/pkg/util/dump"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
)
// PodDevicesEntryV1 connects pod information to devices, without topology information (k8s <= 1.19)
type PodDevicesEntryV1 struct {
PodUID string
ContainerName string
ResourceName string
DeviceIDs []string
AllocResp []byte
}
// checkpointDataV1 struct is used to store pod to device allocation information
// in a checkpoint file, without topology information (k8s <= 1.19)
type checkpointDataV1 struct {
PodDeviceEntries []PodDevicesEntryV1
RegisteredDevices map[string][]string
}
// checksum compute the checksum using the same algorithms (and data type names) k8s 1.19 used.
// We need this special code path to be able to correctly validate the checksum k8s 1.19 wrote.
// credits to https://github.com/kubernetes/kubernetes/pull/102717/commits/353f93895118d2ffa2d59a29a1fbc225160ea1d6
func (cp checkpointDataV1) checksum() checksum.Checksum {
object := dump.ForHash(cp)
object = strings.Replace(object, "checkpointDataV1", "checkpointData", 1)
object = strings.Replace(object, "PodDevicesEntryV1", "PodDevicesEntry", -1)
hash := fnv.New32a()
fmt.Fprintf(hash, "%v", object)
return checksum.Checksum(hash.Sum32())
}
// DataV1 holds checkpoint data and its checksum, in V1 (k8s <= 1.19) format
type DataV1 struct {
Data checkpointDataV1
Checksum checksum.Checksum
}
// NewV1 returns an instance of Checkpoint, in V1 (k8s <= 1.19) format.
// Users should avoid creating checkpoints in formats different from the most recent one,
// use the old formats only to validate existing checkpoint and convert them to most recent
// format. The only exception should be test code.
func NewV1(devEntries []PodDevicesEntryV1,
devices map[string][]string) DeviceManagerCheckpoint {
return &DataV1{
Data: checkpointDataV1{
PodDeviceEntries: devEntries,
RegisteredDevices: devices,
},
}
}
// MarshalCheckpoint is needed to implement the Checkpoint interface, but should not be called anymore
func (cp *DataV1) MarshalCheckpoint() ([]byte, error) {
klog.InfoS("Marshalling a device manager V1 checkpoint")
cp.Checksum = cp.Data.checksum()
return json.Marshal(*cp)
}
// UnmarshalCheckpoint returns unmarshalled data
func (cp *DataV1) UnmarshalCheckpoint(blob []byte) error {
return json.Unmarshal(blob, cp)
}
// VerifyChecksum verifies that passed checksum is same as calculated checksum
func (cp *DataV1) VerifyChecksum() error {
if cp.Checksum != cp.Data.checksum() {
return errors.ErrCorruptCheckpoint
}
return nil
}
// GetDataInLatestFormat returns device entries and registered devices in the *most recent*
// checkpoint format, *not* in the original format stored on disk.
func (cp *DataV1) GetDataInLatestFormat() ([]PodDevicesEntry, map[string][]string) {
var podDevs []PodDevicesEntry
for _, entryV1 := range cp.Data.PodDeviceEntries {
devsPerNuma := NewDevicesPerNUMA()
// no NUMA cell affinity was recorded. The only possible choice
// is to set all the devices affine to node 0.
devsPerNuma[0] = entryV1.DeviceIDs
podDevs = append(podDevs, PodDevicesEntry{
PodUID: entryV1.PodUID,
ContainerName: entryV1.ContainerName,
ResourceName: entryV1.ResourceName,
DeviceIDs: devsPerNuma,
AllocResp: entryV1.AllocResp,
})
}
return podDevs, cp.Data.RegisteredDevices
}

View File

@ -468,33 +468,19 @@ func (m *ManagerImpl) writeCheckpoint() error {
// Reads device to container allocation information from disk, and populates // Reads device to container allocation information from disk, and populates
// m.allocatedDevices accordingly. // m.allocatedDevices accordingly.
func (m *ManagerImpl) readCheckpoint() error { func (m *ManagerImpl) readCheckpoint() error {
// the vast majority of time we restore a compatible checkpoint, so we try cp, err := m.getCheckpoint()
// the current version first. Trying to restore older format checkpoints is
// relevant only in the kubelet upgrade flow, which happens once in a
// (long) while.
cp, err := m.getCheckpointV2()
if err != nil { if err != nil {
if err == errors.ErrCheckpointNotFound { if err == errors.ErrCheckpointNotFound {
// no point in trying anything else // no point in trying anything else
klog.InfoS("Failed to read data from checkpoint", "checkpoint", kubeletDeviceManagerCheckpoint, "err", err) klog.InfoS("Failed to read data from checkpoint", "checkpoint", kubeletDeviceManagerCheckpoint, "err", err)
return nil return nil
} }
return err
var errv1 error
// one last try: maybe it's a old format checkpoint?
cp, errv1 = m.getCheckpointV1()
if errv1 != nil {
klog.InfoS("Failed to read checkpoint V1 file", "err", errv1)
// intentionally return the parent error. We expect to restore V1 checkpoints
// a tiny fraction of time, so what matters most is the current checkpoint read error.
return err
}
klog.InfoS("Read data from a V1 checkpoint", "checkpoint", kubeletDeviceManagerCheckpoint)
} }
m.mutex.Lock() m.mutex.Lock()
defer m.mutex.Unlock() defer m.mutex.Unlock()
podDevices, registeredDevs := cp.GetDataInLatestFormat() podDevices, registeredDevs := cp.GetData()
m.podDevices.fromCheckpointData(podDevices) m.podDevices.fromCheckpointData(podDevices)
m.allocatedDevices = m.podDevices.devices() m.allocatedDevices = m.podDevices.devices()
for resource := range registeredDevs { for resource := range registeredDevs {
@ -507,7 +493,7 @@ func (m *ManagerImpl) readCheckpoint() error {
return nil return nil
} }
func (m *ManagerImpl) getCheckpointV2() (checkpoint.DeviceManagerCheckpoint, error) { func (m *ManagerImpl) getCheckpoint() (checkpoint.DeviceManagerCheckpoint, error) {
registeredDevs := make(map[string][]string) registeredDevs := make(map[string][]string)
devEntries := make([]checkpoint.PodDevicesEntry, 0) devEntries := make([]checkpoint.PodDevicesEntry, 0)
cp := checkpoint.New(devEntries, registeredDevs) cp := checkpoint.New(devEntries, registeredDevs)
@ -515,14 +501,6 @@ func (m *ManagerImpl) getCheckpointV2() (checkpoint.DeviceManagerCheckpoint, err
return cp, err return cp, err
} }
func (m *ManagerImpl) getCheckpointV1() (checkpoint.DeviceManagerCheckpoint, error) {
registeredDevs := make(map[string][]string)
devEntries := make([]checkpoint.PodDevicesEntryV1, 0)
cp := checkpoint.NewV1(devEntries, registeredDevs)
err := m.checkpointManager.GetCheckpoint(kubeletDeviceManagerCheckpoint, cp)
return cp, err
}
// UpdateAllocatedDevices frees any Devices that are bound to terminated pods. // UpdateAllocatedDevices frees any Devices that are bound to terminated pods.
func (m *ManagerImpl) UpdateAllocatedDevices() { func (m *ManagerImpl) UpdateAllocatedDevices() {
if !m.sourcesReady.AllReady() { if !m.sourcesReady.AllReady() {

View File

@ -1771,28 +1771,6 @@ func makeDevice(devOnNUMA checkpoint.DevicesPerNUMA, topology bool) map[string]p
return res return res
} }
const deviceManagerCheckpointFilename = "kubelet_internal_checkpoint"
var oldCheckpoint string = `{"Data":{"PodDeviceEntries":[{"PodUID":"13ac2284-0d19-44b7-b94f-055b032dba9b","ContainerName":"centos","ResourceName":"example.com/deviceA","DeviceIDs":["DevA3"],"AllocResp":"CiIKHUVYQU1QTEVDT01ERVZJQ0VBX0RFVkEzX1RUWTEwEgEwGhwKCi9kZXYvdHR5MTASCi9kZXYvdHR5MTAaAnJ3"},{"PodUID":"86b9a017-c9ca-4069-815f-46ca3e53c1e4","ContainerName":"centos","ResourceName":"example.com/deviceA","DeviceIDs":["DevA4"],"AllocResp":"CiIKHUVYQU1QTEVDT01ERVZJQ0VBX0RFVkE0X1RUWTExEgEwGhwKCi9kZXYvdHR5MTESCi9kZXYvdHR5MTEaAnJ3"}],"RegisteredDevices":{"example.com/deviceA":["DevA1","DevA2","DevA3","DevA4"]}},"Checksum":405612085}`
func TestReadPreNUMACheckpoint(t *testing.T) {
socketDir, socketName, _, err := tmpSocketDir()
require.NoError(t, err)
defer os.RemoveAll(socketDir)
err = os.WriteFile(filepath.Join(socketDir, deviceManagerCheckpointFilename), []byte(oldCheckpoint), 0644)
require.NoError(t, err)
topologyStore := topologymanager.NewFakeManager()
nodes := []cadvisorapi.Node{{Id: 0}}
m, err := newManagerImpl(socketName, nodes, topologyStore)
require.NoError(t, err)
// TODO: we should not calling private methods, but among the existing tests we do anyway
err = m.readCheckpoint()
require.NoError(t, err)
}
func TestGetTopologyHintsWithUpdates(t *testing.T) { func TestGetTopologyHintsWithUpdates(t *testing.T) {
socketDir, socketName, _, err := tmpSocketDir() socketDir, socketName, _, err := tmpSocketDir()
defer os.RemoveAll(socketDir) defer os.RemoveAll(socketDir)

View File

@ -22,8 +22,6 @@ import (
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
"regexp"
"sort"
"strings" "strings"
"time" "time"
@ -33,18 +31,12 @@ import (
"k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/uuid"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/klog/v2" "k8s.io/klog/v2"
kubeletpodresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
"k8s.io/kubernetes/pkg/kubelet/util"
admissionapi "k8s.io/pod-security-admission/api" admissionapi "k8s.io/pod-security-admission/api"
"k8s.io/kubernetes/test/e2e/feature" "k8s.io/kubernetes/test/e2e/feature"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod" e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
e2etestfiles "k8s.io/kubernetes/test/e2e/framework/testfiles" e2etestfiles "k8s.io/kubernetes/test/e2e/framework/testfiles"
"k8s.io/kubernetes/test/e2e/nodefeature" "k8s.io/kubernetes/test/e2e/nodefeature"
testutils "k8s.io/kubernetes/test/utils" testutils "k8s.io/kubernetes/test/utils"
@ -57,227 +49,13 @@ import (
const ( const (
devicePluginDir = "/var/lib/kubelet/device-plugins" devicePluginDir = "/var/lib/kubelet/device-plugins"
checkpointName = "kubelet_internal_checkpoint"
) )
// Serial because the test updates kubelet configuration. // Serial because the test updates kubelet configuration.
var _ = SIGDescribe("Device Manager", framework.WithSerial(), feature.DeviceManager, nodefeature.DeviceManager, func() { var _ = SIGDescribe("Device Manager", framework.WithSerial(), feature.DeviceManager, nodefeature.DeviceManager, func() {
checkpointFullPath := filepath.Join(devicePluginDir, checkpointName)
f := framework.NewDefaultFramework("devicemanager-test") f := framework.NewDefaultFramework("devicemanager-test")
f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
ginkgo.Context("With SRIOV devices in the system", func() {
// this test wants to reproduce what happened in https://github.com/kubernetes/kubernetes/issues/102880
ginkgo.It("should be able to recover V1 (aka pre-1.20) checkpoint data and reject pods before device re-registration", func(ctx context.Context) {
if sriovdevCount, err := countSRIOVDevices(); err != nil || sriovdevCount == 0 {
e2eskipper.Skipf("this test is meant to run on a system with at least one configured VF from SRIOV device")
}
configMap := getSRIOVDevicePluginConfigMap(framework.TestContext.SriovdpConfigMapFile)
sd := setupSRIOVConfigOrFail(ctx, f, configMap)
waitForSRIOVResources(ctx, f, sd)
cntName := "gu-container"
// we create and delete a pod to make sure the internal device manager state contains a pod allocation
ginkgo.By(fmt.Sprintf("Successfully admit one guaranteed pod with 1 core, 1 %s device", sd.resourceName))
var initCtnAttrs []tmCtnAttribute
ctnAttrs := []tmCtnAttribute{
{
ctnName: cntName,
cpuRequest: "1000m",
cpuLimit: "1000m",
deviceName: sd.resourceName,
deviceRequest: "1",
deviceLimit: "1",
},
}
podName := "gu-pod-rec-pre-1"
framework.Logf("creating pod %s attrs %v", podName, ctnAttrs)
pod := makeTopologyManagerTestPod(podName, ctnAttrs, initCtnAttrs)
pod = e2epod.NewPodClient(f).CreateSync(ctx, pod)
// now we need to simulate a node drain, so we remove all the pods, including the sriov device plugin.
ginkgo.By("deleting the pod")
// note we delete right now because we know the current implementation of devicemanager will NOT
// clean up on pod deletion. When this changes, the deletion needs to be done after the test is done.
deletePodSyncByName(ctx, f, pod.Name)
waitForAllContainerRemoval(ctx, pod.Name, pod.Namespace)
ginkgo.By("teardown the sriov device plugin")
// since we will NOT be recreating the plugin, we clean up everything now
teardownSRIOVConfigOrFail(ctx, f, sd)
ginkgo.By("stopping the kubelet")
killKubelet("SIGSTOP")
ginkgo.By("rewriting the kubelet checkpoint file as v1")
err := rewriteCheckpointAsV1(devicePluginDir, checkpointName)
// make sure we remove any leftovers
defer os.Remove(checkpointFullPath)
framework.ExpectNoError(err)
// this mimics a kubelet restart after the upgrade
// TODO: is SIGTERM (less brutal) good enough?
ginkgo.By("killing the kubelet")
killKubelet("SIGKILL")
ginkgo.By("waiting for the kubelet to be ready again")
// Wait for the Kubelet to be ready.
gomega.Eventually(ctx, func(ctx context.Context) bool {
nodes, err := e2enode.TotalReady(ctx, f.ClientSet)
framework.ExpectNoError(err)
return nodes == 1
}, time.Minute, time.Second).Should(gomega.BeTrue())
// note we DO NOT start the sriov device plugin. This is intentional.
// issue#102880 reproduces because of a race on startup caused by corrupted device manager
// state which leads to v1.Node object not updated on apiserver.
// So to hit the issue we need to receive the pod *before* the device plugin registers itself.
// The simplest and safest way to reproduce is just avoid to run the device plugin again
podName = "gu-pod-rec-post-2"
framework.Logf("creating pod %s attrs %v", podName, ctnAttrs)
pod = makeTopologyManagerTestPod(podName, ctnAttrs, initCtnAttrs)
pod = e2epod.NewPodClient(f).Create(ctx, pod)
err = e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "Failed", 30*time.Second, func(pod *v1.Pod) (bool, error) {
if pod.Status.Phase != v1.PodPending {
return true, nil
}
return false, nil
})
framework.ExpectNoError(err)
pod, err = e2epod.NewPodClient(f).Get(ctx, pod.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
if pod.Status.Phase != v1.PodFailed {
framework.Failf("pod %s not failed: %v", pod.Name, pod.Status)
}
framework.Logf("checking pod %s status reason (%s)", pod.Name, pod.Status.Reason)
if !isUnexpectedAdmissionError(pod) {
framework.Failf("pod %s failed for wrong reason: %q", pod.Name, pod.Status.Reason)
}
deletePodSyncByName(ctx, f, pod.Name)
})
ginkgo.It("should be able to recover V1 (aka pre-1.20) checkpoint data and update topology info on device re-registration", func(ctx context.Context) {
if sriovdevCount, err := countSRIOVDevices(); err != nil || sriovdevCount == 0 {
e2eskipper.Skipf("this test is meant to run on a system with at least one configured VF from SRIOV device")
}
endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket)
framework.ExpectNoError(err)
configMap := getSRIOVDevicePluginConfigMap(framework.TestContext.SriovdpConfigMapFile)
sd := setupSRIOVConfigOrFail(ctx, f, configMap)
waitForSRIOVResources(ctx, f, sd)
cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
framework.ExpectNoError(err)
resp, err := cli.GetAllocatableResources(ctx, &kubeletpodresourcesv1.AllocatableResourcesRequest{})
conn.Close()
framework.ExpectNoError(err)
suitableDevs := 0
for _, dev := range resp.GetDevices() {
for _, node := range dev.GetTopology().GetNodes() {
if node.GetID() != 0 {
suitableDevs++
}
}
}
if suitableDevs == 0 {
teardownSRIOVConfigOrFail(ctx, f, sd)
e2eskipper.Skipf("no devices found on NUMA Cell other than 0")
}
cntName := "gu-container"
// we create and delete a pod to make sure the internal device manager state contains a pod allocation
ginkgo.By(fmt.Sprintf("Successfully admit one guaranteed pod with 1 core, 1 %s device", sd.resourceName))
var initCtnAttrs []tmCtnAttribute
ctnAttrs := []tmCtnAttribute{
{
ctnName: cntName,
cpuRequest: "1000m",
cpuLimit: "1000m",
deviceName: sd.resourceName,
deviceRequest: "1",
deviceLimit: "1",
},
}
podName := "gu-pod-rec-pre-1"
framework.Logf("creating pod %s attrs %v", podName, ctnAttrs)
pod := makeTopologyManagerTestPod(podName, ctnAttrs, initCtnAttrs)
pod = e2epod.NewPodClient(f).CreateSync(ctx, pod)
// now we need to simulate a node drain, so we remove all the pods, including the sriov device plugin.
ginkgo.By("deleting the pod")
// note we delete right now because we know the current implementation of devicemanager will NOT
// clean up on pod deletion. When this changes, the deletion needs to be done after the test is done.
deletePodSyncByName(ctx, f, pod.Name)
waitForAllContainerRemoval(ctx, pod.Name, pod.Namespace)
ginkgo.By("teardown the sriov device plugin")
// no need to delete the config now (speed up later)
deleteSRIOVPodOrFail(ctx, f, sd)
ginkgo.By("stopping the kubelet")
killKubelet("SIGSTOP")
ginkgo.By("rewriting the kubelet checkpoint file as v1")
err = rewriteCheckpointAsV1(devicePluginDir, checkpointName)
// make sure we remove any leftovers
defer os.Remove(checkpointFullPath)
framework.ExpectNoError(err)
// this mimics a kubelet restart after the upgrade
// TODO: is SIGTERM (less brutal) good enough?
ginkgo.By("killing the kubelet")
killKubelet("SIGKILL")
ginkgo.By("waiting for the kubelet to be ready again")
// Wait for the Kubelet to be ready.
gomega.Eventually(ctx, func(ctx context.Context) bool {
nodes, err := e2enode.TotalReady(ctx, f.ClientSet)
framework.ExpectNoError(err)
return nodes == 1
}, time.Minute, time.Second).Should(gomega.BeTrue())
sd2 := &sriovData{
configMap: sd.configMap,
serviceAccount: sd.serviceAccount,
}
sd2.pod = createSRIOVPodOrFail(ctx, f)
ginkgo.DeferCleanup(teardownSRIOVConfigOrFail, f, sd2)
waitForSRIOVResources(ctx, f, sd2)
compareSRIOVResources(sd, sd2)
cli, conn, err = podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
framework.ExpectNoError(err)
defer conn.Close()
resp2, err := cli.GetAllocatableResources(ctx, &kubeletpodresourcesv1.AllocatableResourcesRequest{})
framework.ExpectNoError(err)
cntDevs := stringifyContainerDevices(resp.GetDevices())
cntDevs2 := stringifyContainerDevices(resp2.GetDevices())
if cntDevs != cntDevs2 {
framework.Failf("different allocatable resources expected %v got %v", cntDevs, cntDevs2)
}
})
})
/* /*
This end to end test is to simulate a scenario where after kubelet restart/node This end to end test is to simulate a scenario where after kubelet restart/node
reboot application pods requesting devices appear before the device plugin reboot application pods requesting devices appear before the device plugin
@ -512,90 +290,6 @@ var _ = SIGDescribe("Device Manager", framework.WithSerial(), feature.DeviceMana
}) })
func compareSRIOVResources(expected, got *sriovData) {
if expected.resourceName != got.resourceName {
framework.Failf("different SRIOV resource name: expected %q got %q", expected.resourceName, got.resourceName)
}
if expected.resourceAmount != got.resourceAmount {
framework.Failf("different SRIOV resource amount: expected %d got %d", expected.resourceAmount, got.resourceAmount)
}
}
func isUnexpectedAdmissionError(pod *v1.Pod) bool {
re := regexp.MustCompile(`Unexpected.*Admission.*Error`)
return re.MatchString(pod.Status.Reason)
}
func rewriteCheckpointAsV1(dir, name string) error {
ginkgo.By(fmt.Sprintf("Creating temporary checkpoint manager (dir=%q)", dir))
checkpointManager, err := checkpointmanager.NewCheckpointManager(dir)
if err != nil {
return err
}
cp := checkpoint.New(make([]checkpoint.PodDevicesEntry, 0), make(map[string][]string))
err = checkpointManager.GetCheckpoint(name, cp)
if err != nil {
return err
}
ginkgo.By(fmt.Sprintf("Read checkpoint %q %#v", name, cp))
podDevices, registeredDevs := cp.GetDataInLatestFormat()
podDevicesV1 := convertPodDeviceEntriesToV1(podDevices)
cpV1 := checkpoint.NewV1(podDevicesV1, registeredDevs)
blob, err := cpV1.MarshalCheckpoint()
if err != nil {
return err
}
// TODO: why `checkpointManager.CreateCheckpoint(name, cpV1)` doesn't seem to work?
ckPath := filepath.Join(dir, name)
os.WriteFile(filepath.Join("/tmp", name), blob, 0600)
return os.WriteFile(ckPath, blob, 0600)
}
func convertPodDeviceEntriesToV1(entries []checkpoint.PodDevicesEntry) []checkpoint.PodDevicesEntryV1 {
entriesv1 := []checkpoint.PodDevicesEntryV1{}
for _, entry := range entries {
deviceIDs := []string{}
for _, perNUMANodeDevIDs := range entry.DeviceIDs {
deviceIDs = append(deviceIDs, perNUMANodeDevIDs...)
}
entriesv1 = append(entriesv1, checkpoint.PodDevicesEntryV1{
PodUID: entry.PodUID,
ContainerName: entry.ContainerName,
ResourceName: entry.ResourceName,
DeviceIDs: deviceIDs,
AllocResp: entry.AllocResp,
})
}
return entriesv1
}
func stringifyContainerDevices(devs []*kubeletpodresourcesv1.ContainerDevices) string {
entries := []string{}
for _, dev := range devs {
devIDs := dev.GetDeviceIds()
if devIDs != nil {
for _, devID := range dev.DeviceIds {
nodes := dev.GetTopology().GetNodes()
if nodes != nil {
for _, node := range nodes {
entries = append(entries, fmt.Sprintf("%s[%s]@NUMA=%d", dev.ResourceName, devID, node.GetID()))
}
} else {
entries = append(entries, fmt.Sprintf("%s[%s]@NUMA=none", dev.ResourceName, devID))
}
}
} else {
entries = append(entries, dev.ResourceName)
}
}
sort.Strings(entries)
return strings.Join(entries, ", ")
}
func makeBusyboxDeviceRequiringPod(resourceName, cmd string) *v1.Pod { func makeBusyboxDeviceRequiringPod(resourceName, cmd string) *v1.Pod {
podName := "device-manager-test-" + string(uuid.NewUUID()) podName := "device-manager-test-" + string(uuid.NewUUID())
rl := v1.ResourceList{ rl := v1.ResourceList{

View File

@ -461,18 +461,6 @@ func stopKubelet() func() {
} }
} }
// killKubelet sends a signal (SIGINT, SIGSTOP, SIGTERM...) to the running kubelet
func killKubelet(sig string) {
kubeletServiceName := findKubeletServiceName(true)
// reset the kubelet service start-limit-hit
stdout, err := exec.Command("sudo", "systemctl", "reset-failed", kubeletServiceName).CombinedOutput()
framework.ExpectNoError(err, "Failed to reset kubelet start-limit-hit with systemctl: %v, %v", err, stdout)
stdout, err = exec.Command("sudo", "systemctl", "kill", "-s", sig, kubeletServiceName).CombinedOutput()
framework.ExpectNoError(err, "Failed to stop kubelet with systemctl: %v, %v", err, stdout)
}
func kubeletHealthCheck(url string) bool { func kubeletHealthCheck(url string) bool {
insecureTransport := http.DefaultTransport.(*http.Transport).Clone() insecureTransport := http.DefaultTransport.(*http.Transport).Clone()
insecureTransport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} insecureTransport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}