set AllocatedResourcesStatus in the Pod Status

This commit is contained in:
Sergey Kanzhelev 2024-07-22 05:22:16 +00:00
parent 3790ee2fe8
commit 62f96d2748
20 changed files with 614 additions and 32 deletions

View File

@ -10679,6 +10679,7 @@
"io.k8s.api.core.v1.ResourceStatus": {
"properties": {
"name": {
"description": "Name of the resource. Must be unique within the pod and match one of the resources from the pod spec.",
"type": "string"
},
"resources": {

View File

@ -6817,6 +6817,7 @@
"properties": {
"name": {
"default": "",
"description": "Name of the resource. Must be unique within the pod and match one of the resources from the pod spec.",
"type": "string"
},
"resources": {

View File

@ -1172,22 +1172,6 @@ func rroInUse(podSpec *api.PodSpec) bool {
return inUse
}
func allocatedResourcesStatusInUse(podSpec *api.PodStatus) bool {
if podSpec == nil {
return false
}
inUse := func(csl []api.ContainerStatus) bool {
for _, cs := range csl {
if len(cs.AllocatedResourcesStatus) > 0 {
return true
}
}
return false
}
return inUse(podSpec.ContainerStatuses) || inUse(podSpec.InitContainerStatuses) || inUse(podSpec.EphemeralContainerStatuses)
}
func dropDisabledClusterTrustBundleProjection(podSpec, oldPodSpec *api.PodSpec) {
if utilfeature.DefaultFeatureGate.Enabled(features.ClusterTrustBundleProjection) {
return

View File

@ -8209,11 +8209,10 @@ func validateContainerStatusNoAllocatedResourcesStatus(containerStatuses []core.
allErrors := field.ErrorList{}
for i, containerStatus := range containerStatuses {
if containerStatus.AllocatedResourcesStatus == nil {
if len(containerStatus.AllocatedResourcesStatus) == 0 {
continue
} else {
allErrors = append(allErrors, field.Forbidden(fldPath.Index(i).Child("allocatedResourcesStatus"), "cannot be set for a container status"))
}
allErrors = append(allErrors, field.Forbidden(fldPath.Index(i).Child("allocatedResourcesStatus"), "must not be specified in container status"))
}
return allErrors
@ -8263,12 +8262,18 @@ func validateContainerStatusAllocatedResourcesStatus(containerStatuses []core.Co
uniqueResources := sets.New[core.ResourceID]()
// check resource IDs are unique
for k, r := range allocatedResource.Resources {
if r.Health != core.ResourceHealthStatusHealthy && r.Health != core.ResourceHealthStatusUnhealthy && r.Health != core.ResourceHealthStatusUnknown {
allErrors = append(allErrors, field.Invalid(fldPath.Index(i).Child("allocatedResourcesStatus").Index(j).Child("resources").Index(k).Child("health"), r.Health, "must be one of Healthy, Unhealthy, Unknown"))
var supportedResourceHealthValues = sets.New(
core.ResourceHealthStatusHealthy,
core.ResourceHealthStatusUnhealthy,
core.ResourceHealthStatusUnknown)
if !supportedResourceHealthValues.Has(r.Health) {
allErrors = append(allErrors, field.NotSupported(fldPath.Index(i).Child("allocatedResourcesStatus").Index(j).Child("resources").Index(k).Child("health"), r.Health, sets.List(supportedResourceHealthValues)))
}
if uniqueResources.Has(r.ResourceID) {
allErrors = append(allErrors, field.Invalid(fldPath.Index(i).Child("allocatedResourcesStatus").Index(j).Child("resources").Index(k).Child("resourceID"), r.ResourceID, "must be unique"))
allErrors = append(allErrors, field.Duplicate(fldPath.Index(i).Child("allocatedResourcesStatus").Index(j).Child("resources").Index(k).Child("resourceID"), r.ResourceID))
} else {
uniqueResources.Insert(r.ResourceID)
}

View File

@ -24457,11 +24457,11 @@ func TestValidateContainerStatusNoAllocatedResourcesStatus(t *testing.T) {
errs := validateContainerStatusNoAllocatedResourcesStatus(containerStatuses, fldPath)
assert.Equal(t, 2, len(errs))
assert.Len(t, errs, 2)
assert.Equal(t, "spec.containers[1].allocatedResourcesStatus", errs[0].Field)
assert.Equal(t, "cannot be set for a container status", errs[0].Detail)
assert.Equal(t, "must not be specified in container status", errs[0].Detail)
assert.Equal(t, "spec.containers[2].allocatedResourcesStatus", errs[1].Field)
assert.Equal(t, "cannot be set for a container status", errs[1].Detail)
assert.Equal(t, "must not be specified in container status", errs[1].Detail)
}
func TestValidateContainerStatusAllocatedResourcesStatus(t *testing.T) {
@ -24580,7 +24580,7 @@ func TestValidateContainerStatusAllocatedResourcesStatus(t *testing.T) {
},
},
wantFieldErrors: field.ErrorList{
field.Invalid(fldPath.Index(0).Child("allocatedResourcesStatus").Index(0).Child("resources").Index(1).Child("resourceID"), core.ResourceID("resource-1"), "must be unique"),
field.Duplicate(fldPath.Index(0).Child("allocatedResourcesStatus").Index(0).Child("resources").Index(1).Child("resourceID"), core.ResourceID("resource-1")),
},
},
@ -24619,6 +24619,38 @@ func TestValidateContainerStatusAllocatedResourcesStatus(t *testing.T) {
field.Invalid(fldPath.Index(0).Child("allocatedResourcesStatus").Index(1).Child("name"), core.ResourceName("test.device/test2"), "must match one of the container's resource requirements"),
},
},
"don't allow health status outside the known values": {
containers: []core.Container{
{
Name: "container-1",
Resources: core.ResourceRequirements{
Requests: core.ResourceList{
"test.device/test": resource.MustParse("1"),
},
},
},
},
containerStatuses: []core.ContainerStatus{
{
Name: "container-1",
AllocatedResourcesStatus: []core.ResourceStatus{
{
Name: "test.device/test",
Resources: []core.ResourceHealth{
{
ResourceID: "resource-1",
Health: "invalid-health-value",
},
},
},
},
},
},
wantFieldErrors: field.ErrorList{
field.NotSupported(fldPath.Index(0).Child("allocatedResourcesStatus").Index(0).Child("resources").Index(0).Child("health"), core.ResourceHealthStatus("invalid-health-value"), []string{"Healthy", "Unhealthy", "Unknown"}),
},
},
}
for name, tt := range testCases {
t.Run(name, func(t *testing.T) {

View File

@ -664,6 +664,13 @@ const (
// No effect for other cases such as using serverTLSbootstap.
ReloadKubeletServerCertificateFile featuregate.Feature = "ReloadKubeletServerCertificateFile"
// owner: @SergeyKanzhelev
// kep: https://kep.k8s.io/4680
// alpha: v1.31
//
// Adds the AllocatedResourcesStatus to the container status.
ResourceHealthStatus featuregate.Feature = "ResourceHealthStatus"
// owner: @mikedanese
// alpha: v1.7
// beta: v1.12
@ -1150,6 +1157,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
ReloadKubeletServerCertificateFile: {Default: true, PreRelease: featuregate.Beta},
ResourceHealthStatus: {Default: false, PreRelease: featuregate.Alpha},
RotateKubeletServerCertificate: {Default: true, PreRelease: featuregate.Beta},
RuntimeClassInImageCriAPI: {Default: false, PreRelease: featuregate.Alpha},

View File

@ -32,6 +32,7 @@ import (
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
"k8s.io/kubernetes/pkg/kubelet/cm/resourceupdates"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
@ -132,6 +133,12 @@ type ContainerManager interface {
// might need to unprepare resources.
PodMightNeedToUnprepareResources(UID types.UID) bool
// UpdateAllocatedResourcesStatus updates the status of allocated resources for the pod.
UpdateAllocatedResourcesStatus(pod *v1.Pod, status *v1.PodStatus)
// Updates returns a channel that receives an Update when the device changed its status.
Updates() <-chan resourceupdates.Update
// Implements the PodResources Provider API
podresources.CPUsProvider
podresources.DevicesProvider

View File

@ -53,6 +53,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/dra"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager"
memorymanagerstate "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/resourceupdates"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
cmutil "k8s.io/kubernetes/pkg/kubelet/cm/util"
"k8s.io/kubernetes/pkg/kubelet/config"
@ -1027,3 +1028,16 @@ func (cm *containerManagerImpl) UnprepareDynamicResources(pod *v1.Pod) error {
func (cm *containerManagerImpl) PodMightNeedToUnprepareResources(UID types.UID) bool {
return cm.draManager.PodMightNeedToUnprepareResources(UID)
}
func (cm *containerManagerImpl) UpdateAllocatedResourcesStatus(pod *v1.Pod, status *v1.PodStatus) {
// For now we only support Device Plugin
cm.deviceManager.UpdateAllocatedResourcesStatus(pod, status)
// TODO(SergeyKanzhelev, https://kep.k8s.io/4680): add support for DRA resources which is planned for the next iteration of a KEP.
}
func (cm *containerManagerImpl) Updates() <-chan resourceupdates.Update {
// TODO(SergeyKanzhelev, https://kep.k8s.io/4680): add support for DRA resources, for now only use device plugin updates. DRA support is planned for the next iteration of a KEP.
return cm.deviceManager.Updates()
}

View File

@ -28,6 +28,7 @@ import (
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager"
"k8s.io/kubernetes/pkg/kubelet/cm/resourceupdates"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
@ -181,6 +182,13 @@ func (cm *containerManagerStub) PodMightNeedToUnprepareResources(UID types.UID)
return false
}
func (cm *containerManagerStub) UpdateAllocatedResourcesStatus(pod *v1.Pod, status *v1.PodStatus) {
}
func (cm *containerManagerStub) Updates() <-chan resourceupdates.Update {
return nil
}
func NewStubContainerManager() ContainerManager {
return &containerManagerStub{shouldResetExtendedResourceCapacity: false}
}

View File

@ -41,6 +41,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager"
"k8s.io/kubernetes/pkg/kubelet/cm/resourceupdates"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
@ -205,6 +206,19 @@ func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Containe
return opts, nil
}
func (cm *containerManagerImpl) UpdateAllocatedResourcesStatus(pod *v1.Pod, status *v1.PodStatus) {
// For now we only support Device Plugin
cm.deviceManager.UpdateAllocatedResourcesStatus(pod, status)
// TODO(SergeyKanzhelev, https://kep.k8s.io/4680): add support for DRA resources when DRA supports Windows
}
func (cm *containerManagerImpl) Updates() <-chan resourceupdates.Update {
// TODO(SergeyKanzhelev, https://kep.k8s.io/4680): add support for DRA resources, for now only use device plugin updates
return cm.deviceManager.Updates()
}
func (cm *containerManagerImpl) UpdatePluginResources(node *schedulerframework.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
return cm.deviceManager.UpdatePluginResources(node, attrs)
}

View File

@ -33,12 +33,15 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
errorsutil "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
plugin "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/plugin/v1beta1"
"k8s.io/kubernetes/pkg/kubelet/cm/resourceupdates"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
@ -108,6 +111,9 @@ type ManagerImpl struct {
// was reported running by the container runtime when `containerMap` was computed.
// Used to detect pods running across a restart
containerRunningSet sets.Set[string]
// update channel for device health updates
update chan resourceupdates.Update
}
type endpointInfo struct {
@ -151,6 +157,7 @@ func newManagerImpl(socketPath string, topology []cadvisorapi.Node, topologyAffi
numaNodes: numaNodes,
topologyAffinityStore: topologyAffinityStore,
devicesToReuse: make(PodReusableDevices),
update: make(chan resourceupdates.Update),
}
server, err := plugin.NewServer(socketPath, manager, manager)
@ -174,6 +181,10 @@ func newManagerImpl(socketPath string, topology []cadvisorapi.Node, topologyAffi
return manager, nil
}
func (m *ManagerImpl) Updates() <-chan resourceupdates.Update {
return m.update
}
// CleanupPluginDirectory is to remove all existing unix sockets
// from /var/lib/kubelet/device-plugins on Device Plugin Manager start
func (m *ManagerImpl) CleanupPluginDirectory(dir string) error {
@ -259,8 +270,26 @@ func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices [
m.mutex.Lock()
m.healthyDevices[resourceName] = sets.New[string]()
m.unhealthyDevices[resourceName] = sets.New[string]()
oldDevices := m.allDevices[resourceName]
podsToUpdate := sets.New[string]()
m.allDevices[resourceName] = make(map[string]pluginapi.Device)
for _, dev := range devices {
if utilfeature.DefaultFeatureGate.Enabled(features.ResourceHealthStatus) {
// compare with old device's health and send update to the channel if needed
if oldDev, ok := oldDevices[dev.ID]; ok {
if oldDev.Health != dev.Health {
podUID, _ := m.podDevices.getPodAndContainerForDevice(dev.ID)
podsToUpdate.Insert(podUID)
}
} else {
// if this is a new device, it might have existed before and disappeared for a while
// but still be assigned to a Pod. In this case, we need to send an update to the channel
podUID, _ := m.podDevices.getPodAndContainerForDevice(dev.ID)
podsToUpdate.Insert(podUID)
}
}
m.allDevices[resourceName][dev.ID] = dev
if dev.Health == pluginapi.Healthy {
m.healthyDevices[resourceName].Insert(dev.ID)
@ -270,6 +299,15 @@ func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices [
}
}
m.mutex.Unlock()
if utilfeature.DefaultFeatureGate.Enabled(features.ResourceHealthStatus) {
if len(podsToUpdate) > 0 {
m.update <- resourceupdates.Update{
PodUIDs: podsToUpdate.UnsortedList(),
}
}
}
if err := m.writeCheckpoint(); err != nil {
klog.ErrorS(err, "Writing checkpoint encountered")
}
@ -1048,6 +1086,70 @@ func (m *ManagerImpl) GetDevices(podUID, containerName string) ResourceDeviceIns
return m.podDevices.getContainerDevices(podUID, containerName)
}
func (m *ManagerImpl) UpdateAllocatedResourcesStatus(pod *v1.Pod, status *v1.PodStatus) {
m.mutex.Lock()
defer m.mutex.Unlock()
// Today we ignore edge cases that are not likely to happen:
// - update statuses for containers that are in spec, but not in status
// - update statuses for resources requested in spec, but with no information in podDevices
for i, containerStatus := range status.ContainerStatuses {
devices := m.podDevices.getContainerDevices(string(pod.UID), containerStatus.Name)
for resourceName, deviceInstances := range devices {
for id, d := range deviceInstances {
health := pluginapi.Healthy
// this is unlikely, but check for existence here anyways
if r, ok := m.allDevices[resourceName]; ok {
if _, ok := r[id]; ok {
health = m.allDevices[resourceName][id].Health
}
}
d.Health = health
deviceInstances[id] = d
}
}
for resourceName, dI := range devices {
resourceStatus := v1.ResourceStatus{
Name: v1.ResourceName(resourceName),
Resources: []v1.ResourceHealth{},
}
for id, d := range dI {
health := v1.ResourceHealthStatusHealthy
if d.Health != pluginapi.Healthy {
health = v1.ResourceHealthStatusUnhealthy
}
resourceStatus.Resources = append(resourceStatus.Resources, v1.ResourceHealth{
ResourceID: v1.ResourceID(id),
Health: health,
})
}
if status.ContainerStatuses[i].AllocatedResourcesStatus == nil {
status.ContainerStatuses[i].AllocatedResourcesStatus = []v1.ResourceStatus{}
}
// look up the resource status by name and update it
found := false
for j, rs := range status.ContainerStatuses[i].AllocatedResourcesStatus {
if rs.Name == resourceStatus.Name {
status.ContainerStatuses[i].AllocatedResourcesStatus[j] = resourceStatus
found = true
break
}
}
if !found {
status.ContainerStatuses[i].AllocatedResourcesStatus = append(status.ContainerStatuses[i].AllocatedResourcesStatus, resourceStatus)
}
}
}
}
// ShouldResetExtendedResourceCapacity returns whether the extended resources should be zeroed or not,
// depending on whether the node has been recreated. Absence of the checkpoint file strongly indicates the node
// has been recreated.

View File

@ -1850,3 +1850,84 @@ func TestGetTopologyHintsWithUpdates(t *testing.T) {
})
}
}
func TestUpdateAllocatedResourcesStatus(t *testing.T) {
podUID := "test-pod-uid"
containerName := "test-container"
resourceName := "test-resource"
tmpDir, err := os.MkdirTemp("", "checkpoint")
if err != nil {
t.Fatalf("failed to create temp dir: %v", err)
}
defer func() {
err = os.RemoveAll(tmpDir)
if err != nil {
t.Fatalf("Fail to remove tmpdir: %v", err)
}
}()
ckm, err := checkpointmanager.NewCheckpointManager(tmpDir)
if err != nil {
t.Fatalf("failed to create checkpoint manager: %v", err)
}
testManager := &ManagerImpl{
endpoints: make(map[string]endpointInfo),
healthyDevices: make(map[string]sets.Set[string]),
unhealthyDevices: make(map[string]sets.Set[string]),
allocatedDevices: make(map[string]sets.Set[string]),
allDevices: make(map[string]DeviceInstances),
podDevices: newPodDevices(),
checkpointManager: ckm,
}
testManager.podDevices.insert(podUID, containerName, resourceName,
constructDevices([]string{"dev1", "dev2"}),
newContainerAllocateResponse(
withDevices(map[string]string{"/dev/r1dev1": "/dev/r1dev1", "/dev/r1dev2": "/dev/r1dev2"}),
withMounts(map[string]string{"/home/r1lib1": "/usr/r1lib1"}),
),
)
testManager.genericDeviceUpdateCallback(resourceName, []pluginapi.Device{
{ID: "dev1", Health: pluginapi.Healthy},
{ID: "dev2", Health: pluginapi.Unhealthy},
})
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: types.UID(podUID),
},
}
status := &v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{
Name: containerName,
},
},
}
testManager.UpdateAllocatedResourcesStatus(pod, status)
expectedStatus := v1.ResourceStatus{
Name: v1.ResourceName(resourceName),
Resources: []v1.ResourceHealth{
{
ResourceID: "dev1",
Health: pluginapi.Healthy,
},
{
ResourceID: "dev2",
Health: pluginapi.Unhealthy,
},
},
}
expectedContainerStatuses := []v1.ContainerStatus{
{
Name: containerName,
AllocatedResourcesStatus: []v1.ResourceStatus{expectedStatus},
},
}
if !reflect.DeepEqual(status.ContainerStatuses, expectedContainerStatuses) {
t.Errorf("UpdateAllocatedResourcesStatus failed, expected: %v, got: %v", expectedContainerStatuses, status.ContainerStatuses)
}
}

View File

@ -183,6 +183,22 @@ func (pdev *podDevices) devices() map[string]sets.Set[string] {
return ret
}
// Returns podUID and containerName for a device
func (pdev *podDevices) getPodAndContainerForDevice(deviceID string) (string, string) {
pdev.RLock()
defer pdev.RUnlock()
for podUID, containerDevices := range pdev.devs {
for containerName, resources := range containerDevices {
for _, devices := range resources {
if devices.deviceIds.Devices().Has(deviceID) {
return podUID, containerName
}
}
}
}
return "", ""
}
// Turns podDevices to checkpointData.
func (pdev *podDevices) toCheckpointData() []checkpoint.PodDevicesEntry {
var data []checkpoint.PodDevicesEntry

View File

@ -22,6 +22,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/resourceupdates"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
@ -63,6 +64,9 @@ type Manager interface {
// GetDevices returns information about the devices assigned to pods and containers
GetDevices(podUID, containerName string) ResourceDeviceInstances
// UpdateAllocatedResourcesStatus updates the status of allocated resources for the pod.
UpdateAllocatedResourcesStatus(pod *v1.Pod, status *v1.PodStatus)
// GetAllocatableDevices returns information about all the devices known to the manager
GetAllocatableDevices() ResourceDeviceInstances
@ -81,6 +85,9 @@ type Manager interface {
// UpdateAllocatedDevices frees any Devices that are bound to terminated pods.
UpdateAllocatedDevices()
// Updates returns a channel that receives an Update when the device changed its status.
Updates() <-chan resourceupdates.Update
}
// DeviceRunContainerOptions contains the combined container runtime settings to consume its allocated devices.

View File

@ -27,6 +27,7 @@ import (
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager"
"k8s.io/kubernetes/pkg/kubelet/cm/resourceupdates"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
@ -253,3 +254,8 @@ func (cm *FakeContainerManager) UnprepareDynamicResources(*v1.Pod) error {
func (cm *FakeContainerManager) PodMightNeedToUnprepareResources(UID types.UID) bool {
return false
}
func (cm *FakeContainerManager) UpdateAllocatedResourcesStatus(pod *v1.Pod, status *v1.PodStatus) {
}
func (cm *FakeContainerManager) Updates() <-chan resourceupdates.Update {
return nil
}

View File

@ -0,0 +1,25 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resourceupdates
// Update is a struct that represents an update to a pod when
// the resource changes it's status.
// Later we may need to add fields like container name, resource name, and a new status.
type Update struct {
// PodUID is the UID of the pod which status needs to be updated.
PodUIDs []string
}

View File

@ -2473,6 +2473,23 @@ func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubety
status = "started"
}
handleProbeSync(kl, update, handler, "startup", status)
case update := <-kl.containerManager.Updates():
pods := []*v1.Pod{}
for _, p := range update.PodUIDs {
if pod, ok := kl.podManager.GetPodByUID(types.UID(p)); ok {
klog.V(3).InfoS("SyncLoop (containermanager): event for pod", "pod", klog.KObj(pod), "event", update)
pods = append(pods, pod)
} else {
// If the pod no longer exists, ignore the event.
klog.V(4).InfoS("SyncLoop (containermanager): pod does not exist, ignore devices updates", "event", update)
}
}
if len(pods) > 0 {
// Updating the pod by syncing it again
// We do not apply the optimization by updating the status directly, but can do it later
handler.HandlePodSyncs(pods)
}
case <-housekeepingCh:
if !kl.sourcesReady.AllReady() {
// If the sources aren't ready or volume manager has not yet synced the states,

View File

@ -1834,6 +1834,11 @@ func (kl *Kubelet) generateAPIPodStatus(pod *v1.Pod, podStatus *kubecontainer.Po
// ensure the probe managers have up to date status for containers
kl.probeManager.UpdatePodStatus(pod, s)
// update the allocated resources status
if utilfeature.DefaultFeatureGate.Enabled(features.ResourceHealthStatus) {
kl.containerManager.UpdateAllocatedResourcesStatus(pod, s)
}
// preserve all conditions not owned by the kubelet
s.Conditions = make([]v1.PodCondition, 0, len(pod.Status.Conditions)+1)
for _, c := range pod.Status.Conditions {

View File

@ -0,0 +1,249 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2enode
import (
"context"
"fmt"
"time"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"
kubeletdevicepluginv1beta1 "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
imageutils "k8s.io/kubernetes/test/utils/image"
admissionapi "k8s.io/pod-security-admission/api"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/test/e2e/framework"
"k8s.io/kubernetes/test/e2e_node/testdeviceplugin"
)
var _ = SIGDescribe("Device Plugin Failures Pod Status:", framework.WithFeatureGate(features.ResourceHealthStatus), func() {
f := framework.NewDefaultFramework("device-plugin-failures")
f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
type ResourceValue struct {
Allocatable int
Capacity int
}
var getNodeResourceValues = func(ctx context.Context, resourceName string) ResourceValue {
ginkgo.GinkgoHelper()
node := getLocalNode(ctx, f)
// -1 represents that the resource is not found
result := ResourceValue{
Allocatable: -1,
Capacity: -1,
}
for key, val := range node.Status.Capacity {
resource := string(key)
if resource == resourceName {
result.Capacity = int(val.Value())
break
}
}
for key, val := range node.Status.Allocatable {
resource := string(key)
if resource == resourceName {
result.Allocatable = int(val.Value())
break
}
}
return result
}
var createPod = func(resourceName string, quantity int) *v1.Pod {
ginkgo.GinkgoHelper()
rl := v1.ResourceList{v1.ResourceName(resourceName): *resource.NewQuantity(int64(quantity), resource.DecimalSI)}
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "device-plugin-failures-test-" + string(uuid.NewUUID())},
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyAlways,
Containers: []v1.Container{{
Image: busyboxImage,
Name: "container-1",
Command: []string{"sh", "-c", fmt.Sprintf("env && sleep %s", sleepIntervalForever)},
Resources: v1.ResourceRequirements{
Limits: rl,
Requests: rl,
},
}},
},
}
return pod
}
var createPodWrongImage = func(resourceName string, quantity int) *v1.Pod {
ginkgo.GinkgoHelper()
rl := v1.ResourceList{v1.ResourceName(resourceName): *resource.NewQuantity(int64(quantity), resource.DecimalSI)}
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "device-plugin-failures-test-" + string(uuid.NewUUID())},
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyAlways,
Containers: []v1.Container{{
Image: imageutils.GetE2EImage(imageutils.InvalidRegistryImage),
ImagePullPolicy: v1.PullAlways, // this is to make test not fail on non pre-pulled image validation
Name: "container-1",
Command: []string{"sh", "-c", fmt.Sprintf("env && sleep %s", sleepIntervalForever)},
Resources: v1.ResourceRequirements{
Limits: rl,
Requests: rl,
},
}},
},
}
return pod
}
nodeStatusUpdateTimeout := 1 * time.Minute
devicePluginUpdateTimeout := 1 * time.Minute
ginkgo.It("will report a Healthy and then Unhealthy single device in the pod status", func(ctx context.Context) {
// randomizing so tests can run in parallel
resourceName := fmt.Sprintf("test.device/%s", f.UniqueName)
devices := []kubeletdevicepluginv1beta1.Device{{ID: "testdevice", Health: kubeletdevicepluginv1beta1.Healthy}}
plugin := testdeviceplugin.NewDevicePlugin(nil)
err := plugin.RegisterDevicePlugin(ctx, f.UniqueName, resourceName, devices)
defer plugin.Stop() // should stop even if registration failed
gomega.Expect(err).To(gomega.Succeed())
ginkgo.By("initial state: capacity and allocatable are set")
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 1, Capacity: 1}))
// schedule a pod that requests the device
client := e2epod.NewPodClient(f)
pod := client.Create(ctx, createPod(resourceName, 1))
// wait for the pod to be running
gomega.Expect(e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod)).To(gomega.Succeed())
expectedStatus := []v1.ResourceStatus{
{
Name: v1.ResourceName(resourceName),
Resources: []v1.ResourceHealth{
{
ResourceID: "testdevice",
Health: v1.ResourceHealthStatusHealthy,
},
},
},
}
gomega.Eventually(func() []v1.ResourceStatus {
pod, err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Get(ctx, pod.Name, metav1.GetOptions{})
return pod.Status.ContainerStatuses[0].AllocatedResourcesStatus
}, devicePluginUpdateTimeout, f.Timeouts.Poll).Should(gomega.Equal(expectedStatus))
// now make the device unhealthy
devices[0].Health = kubeletdevicepluginv1beta1.Unhealthy
plugin.UpdateDevices(devices)
expectedStatus[0].Resources[0] = v1.ResourceHealth{
ResourceID: "testdevice",
Health: v1.ResourceHealthStatusUnhealthy,
}
gomega.Eventually(func() []v1.ResourceStatus {
pod, err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Get(ctx, pod.Name, metav1.GetOptions{})
return pod.Status.ContainerStatuses[0].AllocatedResourcesStatus
}, devicePluginUpdateTimeout, f.Timeouts.Poll).Should(gomega.Equal(expectedStatus))
// deleting the pod
err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Delete(ctx, pod.Name, metav1.DeleteOptions{})
gomega.Expect(err).To(gomega.Succeed())
waitForContainerRemoval(ctx, pod.Spec.Containers[0].Name, pod.Name, pod.Namespace)
})
ginkgo.It("will report a Device Status for the failed pod in the pod status", func(ctx context.Context) {
// randomizing so tests can run in parallel
resourceName := fmt.Sprintf("test.device/%s", f.UniqueName)
devices := []kubeletdevicepluginv1beta1.Device{{ID: "testdevice", Health: kubeletdevicepluginv1beta1.Healthy}}
plugin := testdeviceplugin.NewDevicePlugin(nil)
err := plugin.RegisterDevicePlugin(ctx, f.UniqueName, resourceName, devices)
defer plugin.Stop() // should stop even if registration failed
gomega.Expect(err).To(gomega.Succeed())
ginkgo.By("initial state: capacity and allocatable are set")
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 1, Capacity: 1}))
// schedule a pod that requests the device
client := e2epod.NewPodClient(f)
pod := client.Create(ctx, createPodWrongImage(resourceName, 1))
// wait for the pod to be running
gomega.Expect(e2epod.WaitForPodCondition(ctx, f.ClientSet, pod.Namespace, pod.Name, "Back-off pulling image", f.Timeouts.PodStartShort,
func(pod *v1.Pod) (bool, error) {
if pod.Status.Phase == v1.PodPending &&
len(pod.Status.ContainerStatuses) > 0 &&
pod.Status.ContainerStatuses[0].State.Waiting != nil &&
pod.Status.ContainerStatuses[0].State.Waiting.Reason == "ImagePullBackOff" {
return true, nil
}
return false, nil
})).To(gomega.Succeed())
expectedStatus := []v1.ResourceStatus{
{
Name: v1.ResourceName(resourceName),
Resources: []v1.ResourceHealth{
{
ResourceID: "testdevice",
Health: v1.ResourceHealthStatusHealthy,
},
},
},
}
gomega.Eventually(func() []v1.ResourceStatus {
pod, err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Get(ctx, pod.Name, metav1.GetOptions{})
return pod.Status.ContainerStatuses[0].AllocatedResourcesStatus
}, devicePluginUpdateTimeout, f.Timeouts.Poll).Should(gomega.Equal(expectedStatus))
// now make the device unhealthy
devices[0].Health = kubeletdevicepluginv1beta1.Unhealthy
plugin.UpdateDevices(devices)
expectedStatus[0].Resources[0] = v1.ResourceHealth{
ResourceID: "testdevice",
Health: "Unhealthy",
}
gomega.Eventually(func() []v1.ResourceStatus {
pod, err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Get(ctx, pod.Name, metav1.GetOptions{})
return pod.Status.ContainerStatuses[0].AllocatedResourcesStatus
}, devicePluginUpdateTimeout, f.Timeouts.Poll).Should(gomega.Equal(expectedStatus))
// deleting the pod
err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Delete(ctx, pod.Name, metav1.DeleteOptions{})
gomega.Expect(err).To(gomega.Succeed())
waitForContainerRemoval(ctx, pod.Spec.Containers[0].Name, pod.Name, pod.Namespace)
})
})

View File

@ -36,16 +36,15 @@ import (
"k8s.io/kubernetes/test/e2e_node/testdeviceplugin"
)
type ResourceValue struct {
Allocatable int
Capacity int
}
// Serial because the test restarts Kubelet
var _ = SIGDescribe("Device Plugin Failures:", framework.WithNodeConformance(), func() {
f := framework.NewDefaultFramework("device-plugin-failures")
f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
type ResourceValue struct {
Allocatable int
Capacity int
}
var getNodeResourceValues = func(ctx context.Context, resourceName string) ResourceValue {
ginkgo.GinkgoHelper()
node := getLocalNode(ctx, f)