Merge pull request #118635 from ffromani/devmgr-check-pod-running

kubelet: devices: skip allocation for running pods
This commit is contained in:
Kubernetes Prow Robot 2023-07-15 05:43:16 -07:00 committed by GitHub
commit 900237fada
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 300 additions and 130 deletions

View File

@ -51,7 +51,6 @@ import (
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
"k8s.io/kubernetes/pkg/kubelet/cm/dra"
@ -563,8 +562,9 @@ func (cm *containerManagerImpl) Start(node *v1.Node,
localStorageCapacityIsolation bool) error {
ctx := context.Background()
containerMap, containerRunningSet := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService)
// Initialize CPU manager
containerMap := buildContainerMapFromRuntime(ctx, runtimeService)
err := cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap)
if err != nil {
return fmt.Errorf("start cpu manager error: %v", err)
@ -572,7 +572,7 @@ func (cm *containerManagerImpl) Start(node *v1.Node,
// Initialize memory manager
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.MemoryManager) {
containerMap := buildContainerMapFromRuntime(ctx, runtimeService)
containerMap, _ := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService)
err := cm.memoryManager.Start(memorymanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap)
if err != nil {
return fmt.Errorf("start memory manager error: %v", err)
@ -636,7 +636,7 @@ func (cm *containerManagerImpl) Start(node *v1.Node,
}
// Starts device manager.
if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady); err != nil {
if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady, containerMap, containerRunningSet); err != nil {
return err
}
@ -699,26 +699,6 @@ func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList {
}
}
func buildContainerMapFromRuntime(ctx context.Context, runtimeService internalapi.RuntimeService) containermap.ContainerMap {
podSandboxMap := make(map[string]string)
podSandboxList, _ := runtimeService.ListPodSandbox(ctx, nil)
for _, p := range podSandboxList {
podSandboxMap[p.Id] = p.Metadata.Uid
}
containerMap := containermap.NewContainerMap()
containerList, _ := runtimeService.ListContainers(ctx, nil)
for _, c := range containerList {
if _, exists := podSandboxMap[c.PodSandboxId]; !exists {
klog.InfoS("No PodSandBox found for the container", "podSandboxId", c.PodSandboxId, "containerName", c.Metadata.Name, "containerId", c.Id)
continue
}
containerMap.Add(podSandboxMap[c.PodSandboxId], c.Metadata.Name, c.Id)
}
return containerMap
}
func isProcessRunningInHost(pid int) (bool, error) {
// Get init pid namespace.
initPidNs, err := os.Readlink("/proc/1/ns/pid")

View File

@ -23,6 +23,7 @@ limitations under the License.
package cm
import (
"context"
"fmt"
"k8s.io/klog/v2"
@ -86,8 +87,11 @@ func (cm *containerManagerImpl) Start(node *v1.Node,
}
}
ctx := context.Background()
containerMap, containerRunningSet := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService)
// Starts device manager.
if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady); err != nil {
if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady, containerMap, containerRunningSet); err != nil {
return err
}

View File

@ -36,6 +36,7 @@ import (
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
plugin "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/plugin/v1beta1"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
@ -97,6 +98,15 @@ type ManagerImpl struct {
// pendingAdmissionPod contain the pod during the admission phase
pendingAdmissionPod *v1.Pod
// containerMap provides a mapping from (pod, container) -> containerID
// for all containers in a pod. Used to detect pods running across a restart
containerMap containermap.ContainerMap
// containerRunningSet identifies which container among those present in `containerMap`
// was reported running by the container runtime when `containerMap` was computed.
// Used to detect pods running across a restart
containerRunningSet sets.String
}
type endpointInfo struct {
@ -216,6 +226,7 @@ func (m *ManagerImpl) PluginConnected(resourceName string, p plugin.DevicePlugin
defer m.mutex.Unlock()
m.endpoints[resourceName] = endpointInfo{e, options}
klog.V(2).InfoS("Device plugin connected", "resourceName", resourceName)
return nil
}
@ -246,6 +257,7 @@ func (m *ManagerImpl) PluginListAndWatchReceiver(resourceName string, resp *plug
}
func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices []pluginapi.Device) {
healthyCount := 0
m.mutex.Lock()
m.healthyDevices[resourceName] = sets.NewString()
m.unhealthyDevices[resourceName] = sets.NewString()
@ -254,6 +266,7 @@ func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices [
m.allDevices[resourceName][dev.ID] = dev
if dev.Health == pluginapi.Healthy {
m.healthyDevices[resourceName].Insert(dev.ID)
healthyCount++
} else {
m.unhealthyDevices[resourceName].Insert(dev.ID)
}
@ -262,6 +275,7 @@ func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices [
if err := m.writeCheckpoint(); err != nil {
klog.ErrorS(err, "Writing checkpoint encountered")
}
klog.V(2).InfoS("Processed device updates for resource", "resourceName", resourceName, "totalCount", len(devices), "healthyCount", healthyCount)
}
// GetWatcherHandler returns the plugin handler
@ -277,11 +291,13 @@ func (m *ManagerImpl) checkpointFile() string {
// Start starts the Device Plugin Manager and start initialization of
// podDevices and allocatedDevices information from checkpointed state and
// starts device plugin registration service.
func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {
func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, initialContainers containermap.ContainerMap, initialContainerRunningSet sets.String) error {
klog.V(2).InfoS("Starting Device Plugin manager")
m.activePods = activePods
m.sourcesReady = sourcesReady
m.containerMap = initialContainers
m.containerRunningSet = initialContainerRunningSet
// Loads in allocatedDevices information from disk.
err := m.readCheckpoint()
@ -545,10 +561,31 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi
}
}
// We have 3 major flows to handle:
// 1. kubelet running, normal allocation (needed > 0, container being [re]created). Steady state and most common case by far and large.
// 2. kubelet restart. In this scenario every other component of the stack (device plugins, app container, runtime) is still running.
// 3. node reboot. In this scenario device plugins may not be running yet when we try to allocate devices.
// note: if we get this far the runtime is surely running. This is usually enforced at OS level by startup system services dependencies.
// First we take care of the exceptional flow (scenarios 2 and 3). In both flows, kubelet is reinitializing, and while kubelet is initializing, sources are NOT all ready.
// Is this a simple kubelet restart (scenario 2)? To distinguish, we use the informations we got for runtime. If we are asked to allocate devices for containers reported
// running, then it can only be a kubelet restart. On node reboot the runtime and the containers were also shut down. Then, if the container was running, it can only be
// because it already has access to all the required devices, so we got nothing to do and we can bail out.
if !m.sourcesReady.AllReady() && m.isContainerAlreadyRunning(podUID, contName) {
klog.V(3).InfoS("container detected running, nothing to do", "deviceNumber", needed, "resourceName", resource, "podUID", string(podUID), "containerName", contName)
return nil, nil
}
// We dealt with scenario 2. If we got this far it's either scenario 3 (node reboot) or scenario 1 (steady state, normal flow).
klog.V(3).InfoS("Need devices to allocate for pod", "deviceNumber", needed, "resourceName", resource, "podUID", string(podUID), "containerName", contName)
healthyDevices, hasRegistered := m.healthyDevices[resource]
// Check if resource registered with devicemanager
// The following checks are expected to fail only happen on scenario 3 (node reboot).
// The kubelet is reinitializing and got a container from sources. But there's no ordering, so an app container may attempt allocation _before_ the device plugin was created,
// has registered and reported back to kubelet the devices.
// This can only happen on scenario 3 because at steady state (scenario 1) the scheduler prevents pod to be sent towards node which don't report enough devices.
// Note: we need to check the device health and registration status *before* we check how many devices are needed, doing otherwise caused issue #109595
// Note: if the scheduler is bypassed, we fall back in scenario 1, so we still need these checks.
if !hasRegistered {
return nil, fmt.Errorf("cannot allocate unregistered device %s", resource)
}
@ -563,7 +600,10 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi
return nil, fmt.Errorf("previously allocated devices are no longer healthy; cannot allocate unhealthy devices %s", resource)
}
// We handled the known error paths in scenario 3 (node reboot), so from now on we can fall back in a common path.
// We cover container restart on kubelet steady state with the same flow.
if needed == 0 {
klog.V(3).InfoS("no devices needed, nothing to do", "deviceNumber", needed, "resourceName", resource, "podUID", string(podUID), "containerName", contName)
// No change, no work.
return nil, nil
}
@ -1040,3 +1080,23 @@ func (m *ManagerImpl) setPodPendingAdmission(pod *v1.Pod) {
m.pendingAdmissionPod = pod
}
func (m *ManagerImpl) isContainerAlreadyRunning(podUID, cntName string) bool {
cntID, err := m.containerMap.GetContainerID(podUID, cntName)
if err != nil {
klog.V(4).InfoS("container not found in the initial map, assumed NOT running", "podUID", podUID, "containerName", cntName, "err", err)
return false
}
// note that if container runtime is down when kubelet restarts, this set will be empty,
// so on kubelet restart containers will again fail admission, hitting https://github.com/kubernetes/kubernetes/issues/118559 again.
// This scenario should however be rare enough.
if !m.containerRunningSet.Has(cntID) {
klog.V(4).InfoS("container not present in the initial running set", "podUID", podUID, "containerName", cntName, "containerID", cntID)
return false
}
// Once we make it here we know we have a running container.
klog.V(4).InfoS("container found in the initial set, assumed running", "podUID", podUID, "containerName", cntName, "containerID", cntID)
return true
}

View File

@ -41,6 +41,7 @@ import (
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
watcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
plugin "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/plugin/v1beta1"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
@ -284,7 +285,9 @@ func setupDeviceManager(t *testing.T, devs []*pluginapi.Device, callback monitor
return []*v1.Pod{}
}
err = w.Start(activePods, &sourcesReadyStub{})
// test steady state, initialization where sourcesReady, containerMap and containerRunningSet
// are relevant will be tested with a different flow
err = w.Start(activePods, &sourcesReadyStub{}, containermap.NewContainerMap(), sets.NewString())
require.NoError(t, err)
return w, updateChan
@ -1012,6 +1015,8 @@ func TestPodContainerDeviceToAllocate(t *testing.T) {
unhealthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
podDevices: newPodDevices(),
activePods: func() []*v1.Pod { return []*v1.Pod{} },
sourcesReady: &sourcesReadyStub{},
}
testManager.podDevices.insert("pod1", "con1", resourceName1,

View File

@ -20,6 +20,8 @@ import (
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
@ -31,7 +33,7 @@ import (
// Manager manages all the Device Plugins running on a node.
type Manager interface {
// Start starts device plugin registration service.
Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error
Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, initialContainers containermap.ContainerMap, initialContainerRunningSet sets.String) error
// Allocate configures and assigns devices to a container in a pod. From
// the requested device resources, Allocate will communicate with the

View File

@ -17,7 +17,14 @@ limitations under the License.
package cm
import (
"context"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
internalapi "k8s.io/cri-api/pkg/apis"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
)
@ -44,3 +51,28 @@ func hardEvictionReservation(thresholds []evictionapi.Threshold, capacity v1.Res
}
return ret
}
func buildContainerMapAndRunningSetFromRuntime(ctx context.Context, runtimeService internalapi.RuntimeService) (containermap.ContainerMap, sets.String) {
podSandboxMap := make(map[string]string)
podSandboxList, _ := runtimeService.ListPodSandbox(ctx, nil)
for _, p := range podSandboxList {
podSandboxMap[p.Id] = p.Metadata.Uid
}
runningSet := sets.NewString()
containerMap := containermap.NewContainerMap()
containerList, _ := runtimeService.ListContainers(ctx, nil)
for _, c := range containerList {
if _, exists := podSandboxMap[c.PodSandboxId]; !exists {
klog.InfoS("No PodSandBox found for the container", "podSandboxId", c.PodSandboxId, "containerName", c.Metadata.Name, "containerId", c.Id)
continue
}
podUID := podSandboxMap[c.PodSandboxId]
containerMap.Add(podUID, c.Metadata.Name, c.Id)
if c.State == runtimeapi.ContainerState_CONTAINER_RUNNING {
klog.V(4).InfoS("Container reported running", "podSandboxId", c.PodSandboxId, "podUID", podUID, "containerName", c.Metadata.Name, "containerId", c.Id)
runningSet.Insert(c.Id)
}
}
return containerMap, runningSet
}

View File

@ -212,7 +212,7 @@ func (m *manager) RemoveContainer(containerID string) error {
}
func (m *manager) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
klog.InfoS("Topology Admit Handler")
klog.InfoS("Topology Admit Handler", "podUID", attrs.Pod.UID, "podNamespace", attrs.Pod.Namespace, "podName", attrs.Pod.Name)
metrics.TopologyManagerAdmissionRequestsTotal.Inc()
startTime := time.Now()

View File

@ -721,11 +721,14 @@ func hasFailed(hasFailed bool) types.GomegaMatcher {
}).WithTemplate("expected Pod failed {{.To}} be in {{format .Data}}\nGot instead:\n{{.FormattedActual}}").WithTemplateData(hasFailed)
}
func getPod(ctx context.Context, f *framework.Framework, podName string) (bool, error) {
func getPodByName(ctx context.Context, f *framework.Framework, podName string) (*v1.Pod, error) {
return e2epod.NewPodClient(f).Get(ctx, podName, metav1.GetOptions{})
}
pod, err := e2epod.NewPodClient(f).Get(ctx, podName, metav1.GetOptions{})
func getPod(ctx context.Context, f *framework.Framework, podName string) (bool, error) {
pod, err := getPodByName(ctx, f, podName)
if err != nil {
return false, fmt.Errorf("expected node to get pod=%q got err=%q", pod.Name, err)
return false, err
}
expectedStatusReason := "UnexpectedAdmissionError"

View File

@ -28,20 +28,23 @@ import (
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
"github.com/onsi/gomega/gcustom"
"github.com/onsi/gomega/types"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/klog/v2"
kubeletdevicepluginv1beta1 "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
admissionapi "k8s.io/pod-security-admission/api"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/kubectl/pkg/util/podutils"
kubeletpodresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1"
kubeletpodresourcesv1alpha1 "k8s.io/kubelet/pkg/apis/podresources/v1alpha1"
"k8s.io/kubernetes/test/e2e/framework"
@ -239,7 +242,7 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) {
// simulate container restart, while all other involved components (kubelet, device plugin) stay stable. To do so, in the container
// entry point we sleep for a limited and short period of time. The device assignment should be kept and be stable across the container
// restarts. For the sake of brevity we however check just the fist restart.
ginkgo.It("Keeps device plugin assignments across pod restarts (no kubelet restart, device plugin re-registration)", func(ctx context.Context) {
ginkgo.It("Keeps device plugin assignments across pod restarts (no kubelet restart, no device plugin restart)", func(ctx context.Context) {
podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalWithRestart)
pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD))
deviceIDRE := "stub devices: (Dev-[0-9]+)"
@ -266,7 +269,7 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) {
if err != nil {
framework.ExpectNoError(err, "getting pod resources assignment after pod restart")
}
err = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1})
err, _ = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1})
framework.ExpectNoError(err, "inconsistent device assignment after pod restart")
ginkgo.By("Creating another pod")
@ -285,16 +288,17 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) {
if err != nil {
framework.ExpectNoError(err, "getting pod resources assignment after pod restart")
}
err = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1})
err, _ = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1})
framework.ExpectNoError(err, "inconsistent device assignment after extra container restart - pod1")
err = checkPodResourcesAssignment(v1PodResources, pod2.Namespace, pod2.Name, pod2.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID2})
err, _ = checkPodResourcesAssignment(v1PodResources, pod2.Namespace, pod2.Name, pod2.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID2})
framework.ExpectNoError(err, "inconsistent device assignment after extra container restart - pod2")
})
// simulate kubelet restart, *but not* device plugin re-registration, while the pod and the container stays running.
// simulate kubelet restart. A compliant device plugin is expected to re-register, while the pod and the container stays running.
// The flow with buggy or slow device plugin is deferred to another test.
// The device assignment should be kept and be stable across the kubelet restart, because it's the kubelet which performs the device allocation,
// and both the device plugin and the actual consumer (container) are stable.
ginkgo.It("Keeps device plugin assignments across kubelet restarts (no pod restart, no device plugin re-registration)", func(ctx context.Context) {
ginkgo.It("Keeps device plugin assignments across kubelet restarts (no pod restart, no device plugin restart)", func(ctx context.Context) {
podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalForever)
pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD))
deviceIDRE := "stub devices: (Dev-[0-9]+)"
@ -304,6 +308,7 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) {
pod1, err = e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
framework.Logf("testing pod: pre-restart UID=%s namespace=%s name=%s ready=%v", pod1.UID, pod1.Namespace, pod1.Name, podutils.IsPodReady(pod1))
ginkgo.By("Restarting Kubelet")
restartKubelet(true)
@ -319,12 +324,16 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) {
CountSampleDeviceAllocatable(node) == expectedSampleDevsAmount
}, 30*time.Second, framework.Poll).Should(gomega.BeTrue())
ginkgo.By("Waiting for the pod to fail with admission error as device plugin hasn't re-registered yet")
gomega.Eventually(ctx, getPod).
ginkgo.By("Checking the same instance of the pod is still running")
gomega.Eventually(ctx, getPodByName).
WithArguments(f, pod1.Name).
WithTimeout(time.Minute).
Should(HaveFailedWithAdmissionError(),
"the pod succeeded to start, when it should fail with the admission error")
Should(BeTheSamePodStillRunning(pod1),
"the same pod instance not running across kubelet restarts, workload should not be perturbed by kubelet restarts")
pod2, err := e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
framework.Logf("testing pod: post-restart UID=%s namespace=%s name=%s ready=%v", pod2.UID, pod2.Namespace, pod2.Name, podutils.IsPodReady(pod2))
// crosscheck from the device assignment is preserved and stable from perspective of the kubelet.
// note we don't check again the logs of the container: the check is done at startup, the container
@ -336,14 +345,14 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) {
return err
}, 30*time.Second, framework.Poll).ShouldNot(gomega.HaveOccurred(), "cannot fetch the compute resource assignment after kubelet restart")
err = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1})
err, _ = checkPodResourcesAssignment(v1PodResources, pod2.Namespace, pod2.Name, pod2.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1})
framework.ExpectNoError(err, "inconsistent device assignment after pod restart")
})
// simulate kubelet and container restart, *but not* device plugin re-registration.
// simulate kubelet and container restart, *but not* device plugin restart.
// The device assignment should be kept and be stable across the kubelet and container restart, because it's the kubelet which
// performs the device allocation, and both the device plugin is stable.
ginkgo.It("Keeps device plugin assignments across pod and kubelet restarts (no device plugin re-registration)", func(ctx context.Context) {
ginkgo.It("Keeps device plugin assignments across pod and kubelet restarts (no device plugin restart)", func(ctx context.Context) {
podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalWithRestart)
pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD))
deviceIDRE := "stub devices: (Dev-[0-9]+)"
@ -372,24 +381,34 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) {
ginkgo.By("Wait for node to be ready again")
e2enode.WaitForAllNodesSchedulable(ctx, f.ClientSet, 5*time.Minute)
ginkgo.By("Waiting for the pod to fail with admission error as device plugin hasn't re-registered yet")
gomega.Eventually(ctx, getPod).
ginkgo.By("Checking an instance of the pod is running")
gomega.Eventually(ctx, getPodByName).
WithArguments(f, pod1.Name).
WithTimeout(time.Minute).
Should(HaveFailedWithAdmissionError(),
"the pod succeeded to start, when it should fail with the admission error")
Should(gomega.And(
BeAPodInPhase(v1.PodRunning),
BeAPodReady(),
),
"the pod should still be running, the workload should not be perturbed by kubelet restarts")
// crosscheck from the device assignment is preserved and stable from perspective of the kubelet.
// note we don't check again the logs of the container: the check is done at startup, the container
// never restarted (runs "forever" from this test timescale perspective) hence re-doing this check
// is useless.
ginkgo.By("Verifying the device assignment after kubelet restart using podresources API")
ginkgo.By("Verifying the device assignment after pod and kubelet restart using container logs")
var devID1Restarted string
gomega.Eventually(ctx, func() string {
devID1Restarted, err = parseLog(ctx, f, pod1.Name, pod1.Name, deviceIDRE)
if err != nil {
framework.Logf("error getting logds for pod %q: %v", pod1.Name, err)
return ""
}
return devID1Restarted
}, 30*time.Second, framework.Poll).Should(gomega.Equal(devID1), "pod %s reports a different device after restarts: %s (expected %s)", pod1.Name, devID1Restarted, devID1)
ginkgo.By("Verifying the device assignment after pod and kubelet restart using podresources API")
gomega.Eventually(ctx, func() error {
v1PodResources, err = getV1NodeDevices(ctx)
return err
}, 30*time.Second, framework.Poll).ShouldNot(gomega.HaveOccurred(), "cannot fetch the compute resource assignment after kubelet restart")
err = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1})
err, _ = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1})
framework.ExpectNoError(err, "inconsistent device assignment after pod restart")
})
@ -397,8 +416,7 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) {
// After the device plugin has re-registered, the list healthy devices is repopulated based on the devices discovered.
// Once Pod2 is running we determine the device that was allocated it. As long as the device allocation succeeds the
// test should pass.
ginkgo.It("Keeps device plugin assignments after the device plugin has been re-registered (no kubelet, pod restart)", func(ctx context.Context) {
ginkgo.It("Keeps device plugin assignments after the device plugin has restarted (no kubelet restart, pod restart)", func(ctx context.Context) {
podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalForever)
pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD))
deviceIDRE := "stub devices: (Dev-[0-9]+)"
@ -433,43 +451,32 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) {
CountSampleDeviceAllocatable(node) == expectedSampleDevsAmount
}, 30*time.Second, framework.Poll).Should(gomega.BeTrue())
// crosscheck that after device plugin re-registration the device assignment is preserved and
// crosscheck that after device plugin restart the device assignment is preserved and
// stable from the kubelet's perspective.
// note we don't check again the logs of the container: the check is done at startup, the container
// never restarted (runs "forever" from this test timescale perspective) hence re-doing this check
// is useless.
ginkgo.By("Verifying the device assignment after device plugin re-registration using podresources API")
ginkgo.By("Verifying the device assignment after device plugin restart using podresources API")
gomega.Eventually(ctx, func() error {
v1PodResources, err = getV1NodeDevices(ctx)
return err
}, 30*time.Second, framework.Poll).ShouldNot(gomega.HaveOccurred(), "cannot fetch the compute resource assignment after kubelet restart")
err = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1})
err, _ = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1})
framework.ExpectNoError(err, "inconsistent device assignment after pod restart")
ginkgo.By("Creating another pod")
pod2 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD))
err = e2epod.WaitTimeoutForPodRunningInNamespace(ctx, f.ClientSet, pod2.Name, f.Namespace.Name, 1*time.Minute)
framework.ExpectNoError(err)
ginkgo.By("Checking that pod got a fake device")
devID2, err := parseLog(ctx, f, pod2.Name, pod2.Name, deviceIDRE)
framework.ExpectNoError(err, "getting logs for pod %q", pod2.Name)
gomega.Expect(devID2).To(gomega.Not(gomega.Equal("")), "pod2 requested a device but started successfully without")
})
// simulate kubelet restart *and* device plugin re-registration, while the pod and the container stays running.
// simulate kubelet restart *and* device plugin restart, while the pod and the container stays running.
// The device assignment should be kept and be stable across the kubelet/device plugin restart, as both the aforementioned components
// orchestrate the device allocation: the actual consumer (container) is stable.
ginkgo.It("Keeps device plugin assignments after kubelet restart and device plugin has been re-registered (no pod restart)", func(ctx context.Context) {
ginkgo.It("Keeps device plugin assignments after kubelet restart and device plugin restart (no pod restart)", func(ctx context.Context) {
podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalForever) // the pod has to run "forever" in the timescale of this test
pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD))
deviceIDRE := "stub devices: (Dev-[0-9]+)"
devID1, err := parseLog(ctx, f, pod1.Name, pod1.Name, deviceIDRE)
framework.ExpectNoError(err, "getting logs for pod %q", pod1.Name)
gomega.Expect(devID1).To(gomega.Not(gomega.Equal("")), "pod1 requested a device but started successfully without")
gomega.Expect(devID1).To(gomega.Not(gomega.BeEmpty()), "pod1 requested a device but started successfully without")
pod1, err = e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
@ -480,12 +487,12 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) {
ginkgo.By("Wait for node to be ready again")
e2enode.WaitForAllNodesSchedulable(ctx, f.ClientSet, 5*time.Minute)
ginkgo.By("Waiting for the pod to fail with admission error as device plugin hasn't re-registered yet")
gomega.Eventually(ctx, getPod).
ginkgo.By("Checking the same instance of the pod is still running after kubelet restart")
gomega.Eventually(ctx, getPodByName).
WithArguments(f, pod1.Name).
WithTimeout(time.Minute).
Should(HaveFailedWithAdmissionError(),
"the pod succeeded to start, when it should fail with the admission error")
Should(BeTheSamePodStillRunning(pod1),
"the same pod instance not running across kubelet restarts, workload should not be perturbed by kubelet restarts")
// crosscheck from the device assignment is preserved and stable from perspective of the kubelet.
// note we don't check again the logs of the container: the check is done at startup, the container
@ -497,7 +504,7 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) {
return err
}, 30*time.Second, framework.Poll).ShouldNot(gomega.HaveOccurred(), "cannot fetch the compute resource assignment after kubelet restart")
err = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1})
err, _ = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1})
framework.ExpectNoError(err, "inconsistent device assignment after pod restart")
ginkgo.By("Re-Register resources by deleting the plugin pod")
@ -511,7 +518,7 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) {
ginkgo.By("Recreating the plugin pod")
devicePluginPod = e2epod.NewPodClient(f).CreateSync(ctx, dptemplate)
ginkgo.By("Waiting for resource to become available on the local node after re-registration")
ginkgo.By("Waiting for resource to become available on the local node after restart")
gomega.Eventually(ctx, func() bool {
node, ready := getLocalTestNode(ctx, f)
return ready &&
@ -519,22 +526,68 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) {
CountSampleDeviceAllocatable(node) == expectedSampleDevsAmount
}, 30*time.Second, framework.Poll).Should(gomega.BeTrue())
ginkgo.By("Creating another pod")
pod2 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD))
ginkgo.By("Checking the same instance of the pod is still running after the device plugin restart")
gomega.Eventually(ctx, getPodByName).
WithArguments(f, pod1.Name).
WithTimeout(time.Minute).
Should(BeTheSamePodStillRunning(pod1),
"the same pod instance not running across kubelet restarts, workload should not be perturbed by device plugins restarts")
})
ginkgo.By("Checking that pod got a fake device")
devID2, err := parseLog(ctx, f, pod2.Name, pod2.Name, deviceIDRE)
framework.ExpectNoError(err, "getting logs for pod %q", pod2.Name)
ginkgo.It("[OrphanedPods] Ensures pods consuming devices deleted while kubelet is down are cleaned up correctly", func(ctx context.Context) {
podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalWithRestart)
pod := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD))
ginkgo.By("Verifying the device assignment after kubelet restart and device plugin re-registration using podresources API")
// note we don't use eventually: the kubelet is supposed to be running and stable by now, so the call should just succeed
v1PodResources, err = getV1NodeDevices(ctx)
if err != nil {
framework.ExpectNoError(err, "getting pod resources assignment after pod restart")
deviceIDRE := "stub devices: (Dev-[0-9]+)"
devID, err := parseLog(ctx, f, pod.Name, pod.Name, deviceIDRE)
framework.ExpectNoError(err, "getting logs for pod %q", pod.Name)
gomega.Expect(devID).To(gomega.Not(gomega.BeEmpty()), "pod1 requested a device but started successfully without")
pod, err = e2epod.NewPodClient(f).Get(ctx, pod.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
ginkgo.By("stopping the kubelet")
startKubelet := stopKubelet()
// wait until the kubelet health check will fail
gomega.Eventually(ctx, func() bool {
ok := kubeletHealthCheck(kubeletHealthCheckURL)
framework.Logf("kubelet health check at %q value=%v", kubeletHealthCheckURL, ok)
return ok
}, f.Timeouts.PodStart, f.Timeouts.Poll).Should(gomega.BeFalse())
framework.Logf("Delete the pod while the kubelet is not running")
// Delete pod sync by name will force delete the pod, removing it from kubelet's config
deletePodSyncByName(ctx, f, pod.Name)
framework.Logf("Starting the kubelet")
startKubelet()
// wait until the kubelet health check will succeed
gomega.Eventually(ctx, func() bool {
ok := kubeletHealthCheck(kubeletHealthCheckURL)
framework.Logf("kubelet health check at %q value=%v", kubeletHealthCheckURL, ok)
return ok
}, f.Timeouts.PodStart, f.Timeouts.Poll).Should(gomega.BeTrue())
framework.Logf("wait for the pod %v to disappear", pod.Name)
gomega.Eventually(ctx, func(ctx context.Context) error {
err := checkMirrorPodDisappear(ctx, f.ClientSet, pod.Name, pod.Namespace)
framework.Logf("pod %s/%s disappear check err=%v", pod.Namespace, pod.Name, err)
return err
}, f.Timeouts.PodDelete, f.Timeouts.Poll).Should(gomega.BeNil())
waitForAllContainerRemoval(ctx, pod.Name, pod.Namespace)
ginkgo.By("Verifying the device assignment after device plugin restart using podresources API")
gomega.Eventually(ctx, func() error {
v1PodResources, err = getV1NodeDevices(ctx)
return err
}, 30*time.Second, framework.Poll).ShouldNot(gomega.HaveOccurred(), "cannot fetch the compute resource assignment after kubelet restart")
err, allocated := checkPodResourcesAssignment(v1PodResources, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, SampleDeviceResourceName, []string{})
if err == nil || allocated {
framework.Fail(fmt.Sprintf("stale device assignment after pod deletion while kubelet was down allocated=%v error=%v", allocated, err))
}
err = checkPodResourcesAssignment(v1PodResources, pod2.Namespace, pod2.Name, pod2.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID2})
framework.ExpectNoError(err, "inconsistent device assignment after extra container restart - pod2")
})
})
}
@ -542,10 +595,10 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) {
func testDevicePluginNodeReboot(f *framework.Framework, pluginSockDir string) {
ginkgo.Context("DevicePlugin [Serial] [Disruptive]", func() {
var devicePluginPod *v1.Pod
var v1alphaPodResources *kubeletpodresourcesv1alpha1.ListPodResourcesResponse
var v1PodResources *kubeletpodresourcesv1.ListPodResourcesResponse
var triggerPathFile, triggerPathDir string
var err error
ginkgo.BeforeEach(func(ctx context.Context) {
ginkgo.By("Wait for node to be ready")
gomega.Eventually(ctx, func(ctx context.Context) bool {
@ -560,50 +613,51 @@ func testDevicePluginNodeReboot(f *framework.Framework, pluginSockDir string) {
// This is done in a gomega.Eventually with retries since a prior test in a different test suite could've run and the deletion of it's resources may still be in progress.
// xref: https://issue.k8s.io/115381
gomega.Eventually(ctx, func(ctx context.Context) error {
v1alphaPodResources, err = getV1alpha1NodeDevices(ctx)
if err != nil {
return fmt.Errorf("failed to get node local podresources by accessing the (v1alpha) podresources API endpoint: %v", err)
}
v1PodResources, err = getV1NodeDevices(ctx)
if err != nil {
return fmt.Errorf("failed to get node local podresources by accessing the (v1) podresources API endpoint: %v", err)
}
if len(v1alphaPodResources.PodResources) > 0 {
return fmt.Errorf("expected v1alpha pod resources to be empty, but got non-empty resources: %+v", v1alphaPodResources.PodResources)
}
if len(v1PodResources.PodResources) > 0 {
return fmt.Errorf("expected v1 pod resources to be empty, but got non-empty resources: %+v", v1PodResources.PodResources)
}
return nil
}, f.Timeouts.PodDelete, f.Timeouts.Poll).Should(gomega.Succeed())
}, f.Timeouts.SystemDaemonsetStartup, f.Timeouts.Poll).Should(gomega.Succeed())
ginkgo.By("Setting up the directory and file for controlling registration")
ginkgo.By("Setting up the directory for controlling registration")
triggerPathDir = filepath.Join(devicePluginDir, "sample")
if _, err := os.Stat(triggerPathDir); errors.Is(err, os.ErrNotExist) {
err := os.Mkdir(triggerPathDir, os.ModePerm)
if err != nil {
klog.Errorf("Directory creation %s failed: %v ", triggerPathDir, err)
panic(err)
}
klog.InfoS("Directory created successfully")
triggerPathFile = filepath.Join(triggerPathDir, "registration")
if _, err := os.Stat(triggerPathFile); errors.Is(err, os.ErrNotExist) {
_, err = os.Create(triggerPathFile)
if err != nil {
klog.Errorf("File creation %s failed: %v ", triggerPathFile, err)
panic(err)
if _, err := os.Stat(triggerPathDir); err != nil {
if errors.Is(err, os.ErrNotExist) {
if err := os.Mkdir(triggerPathDir, os.ModePerm); err != nil {
framework.Fail(fmt.Sprintf("registration control directory %q creation failed: %v ", triggerPathDir, err))
}
framework.Logf("registration control directory created successfully")
} else {
framework.Fail(fmt.Sprintf("unexpected error checking %q: %v", triggerPathDir, err))
}
} else {
framework.Logf("registration control directory %q already present", triggerPathDir)
}
ginkgo.By("Setting up the file trigger for controlling registration")
triggerPathFile = filepath.Join(triggerPathDir, "registration")
if _, err := os.Stat(triggerPathFile); err != nil {
if errors.Is(err, os.ErrNotExist) {
if _, err = os.Create(triggerPathFile); err != nil {
framework.Fail(fmt.Sprintf("registration control file %q creation failed: %v", triggerPathFile, err))
}
framework.Logf("registration control file created successfully")
} else {
framework.Fail(fmt.Sprintf("unexpected error creating %q: %v", triggerPathFile, err))
}
} else {
framework.Logf("registration control file %q already present", triggerPathFile)
}
ginkgo.By("Scheduling a sample device plugin pod")
data, err := e2etestfiles.Read(SampleDevicePluginControlRegistrationDSYAML)
if err != nil {
framework.Fail(err.Error())
framework.Fail(fmt.Sprintf("error reading test data %q: %v", SampleDevicePluginControlRegistrationDSYAML, err))
}
ds := readDaemonSetV1OrDie(data)
@ -675,8 +729,7 @@ func testDevicePluginNodeReboot(f *framework.Framework, pluginSockDir string) {
// simulate node reboot scenario by removing pods using CRI before kubelet is started. In addition to that,
// intentionally a scenario is created where after node reboot, application pods requesting devices appear before the device plugin pod
// exposing those devices as resource has re-registers itself to Kubelet. The expected behavior is that the application pod fails at
// admission time.
// exposing those devices as resource has restarted. The expected behavior is that the application pod fails at admission time.
ginkgo.It("Keeps device plugin assignments across node reboots (no pod restart, no device plugin re-registration)", func(ctx context.Context) {
podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalForever)
pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD))
@ -728,7 +781,7 @@ func testDevicePluginNodeReboot(f *framework.Framework, pluginSockDir string) {
return err
}, 30*time.Second, framework.Poll).ShouldNot(gomega.HaveOccurred(), "cannot fetch the compute resource assignment after kubelet restart")
err = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1})
err, _ = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1})
framework.ExpectNoError(err, "inconsistent device assignment after node reboot")
})
@ -796,7 +849,7 @@ func parseLog(ctx context.Context, f *framework.Framework, podName string, contN
return matches[1], nil
}
func checkPodResourcesAssignment(v1PodRes *kubeletpodresourcesv1.ListPodResourcesResponse, podNamespace, podName, containerName, resourceName string, devs []string) error {
func checkPodResourcesAssignment(v1PodRes *kubeletpodresourcesv1.ListPodResourcesResponse, podNamespace, podName, containerName, resourceName string, devs []string) (error, bool) {
for _, podRes := range v1PodRes.PodResources {
if podRes.Namespace != podNamespace || podRes.Name != podName {
continue
@ -808,10 +861,12 @@ func checkPodResourcesAssignment(v1PodRes *kubeletpodresourcesv1.ListPodResource
return matchContainerDevices(podNamespace+"/"+podName+"/"+containerName, contRes.Devices, resourceName, devs)
}
}
return fmt.Errorf("no resources found for %s/%s/%s", podNamespace, podName, containerName)
err := fmt.Errorf("no resources found for %s/%s/%s", podNamespace, podName, containerName)
framework.Logf("%v", err)
return err, false
}
func matchContainerDevices(ident string, contDevs []*kubeletpodresourcesv1.ContainerDevices, resourceName string, devs []string) error {
func matchContainerDevices(ident string, contDevs []*kubeletpodresourcesv1.ContainerDevices, resourceName string, devs []string) (error, bool) {
expected := sets.New[string](devs...)
assigned := sets.New[string]()
for _, contDev := range contDevs {
@ -824,9 +879,9 @@ func matchContainerDevices(ident string, contDevs []*kubeletpodresourcesv1.Conta
assignedStr := strings.Join(assigned.UnsortedList(), ",")
framework.Logf("%s: devices expected %q assigned %q", ident, expectedStr, assignedStr)
if !assigned.Equal(expected) {
return fmt.Errorf("device allocation mismatch for %s expected %s assigned %s", ident, expectedStr, assignedStr)
return fmt.Errorf("device allocation mismatch for %s expected %s assigned %s", ident, expectedStr, assignedStr), true
}
return nil
return nil, true
}
// getSampleDevicePluginPod returns the Sample Device Plugin pod to be used e2e tests.
@ -851,3 +906,32 @@ func getSampleDevicePluginPod(pluginSockDir string) *v1.Pod {
return dp
}
func BeTheSamePodStillRunning(expected *v1.Pod) types.GomegaMatcher {
return gomega.And(
BeTheSamePodAs(expected.UID),
BeAPodInPhase(v1.PodRunning),
BeAPodReady(),
)
}
// BeReady matches if the pod is reported ready
func BeAPodReady() types.GomegaMatcher {
return gcustom.MakeMatcher(func(actual *v1.Pod) (bool, error) {
return podutils.IsPodReady(actual), nil
}).WithTemplate("Pod {{.Actual.Namespace}}/{{.Actual.Name}} UID {{.Actual.UID}} not ready yet")
}
// BeAPodInPhase matches if the pod is running
func BeAPodInPhase(phase v1.PodPhase) types.GomegaMatcher {
return gcustom.MakeMatcher(func(actual *v1.Pod) (bool, error) {
return actual.Status.Phase == phase, nil
}).WithTemplate("Pod {{.Actual.Namespace}}/{{.Actual.Name}} failed {{.To}} be in phase {{.Data}} instead is in phase {{.Actual.Status.Phase}}").WithTemplateData(phase)
}
// BeTheSamePodAs matches if the pod has the given UID
func BeTheSamePodAs(podUID k8stypes.UID) types.GomegaMatcher {
return gcustom.MakeMatcher(func(actual *v1.Pod) (bool, error) {
return actual.UID == podUID, nil
}).WithTemplate("Pod {{.Actual.Namespace}}/{{.Actual.Name}} expected UID {{.Data}} has UID instead {{.Actual.UID}}").WithTemplateData(podUID)
}