Track actuated resources in the allocation manager

This commit is contained in:
Tim Allclair 2025-03-05 12:57:21 -08:00
parent 2effa5e3cf
commit 660bd6b42d
2 changed files with 67 additions and 25 deletions

View File

@ -17,6 +17,8 @@ limitations under the License.
package allocation package allocation
import ( import (
"path/filepath"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality" apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
@ -29,7 +31,10 @@ import (
) )
// podStatusManagerStateFile is the file name where status manager stores its state // podStatusManagerStateFile is the file name where status manager stores its state
const podStatusManagerStateFile = "pod_status_manager_state" const (
allocatedPodsStateFile = "allocated_pods_state"
actuatedPodsStateFile = "actuated_pods_state"
)
// AllocationManager tracks pod resource allocations. // AllocationManager tracks pod resource allocations.
type Manager interface { type Manager interface {
@ -42,56 +47,70 @@ type Manager interface {
UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool)
// SetPodAllocation checkpoints the resources allocated to a pod's containers. // SetPodAllocation checkpoints the resources allocated to a pod's containers.
SetPodAllocation(pod *v1.Pod) error SetPodAllocation(allocatedPod *v1.Pod) error
// DeletePodAllocation removes any stored state for the given pod UID. // SetActuatedResources records the actuated resources of the given container (or the entire
DeletePodAllocation(uid types.UID) // pod, if actuatedContainer is nil).
SetActuatedResources(allocatedPod *v1.Pod, actuatedContainer *v1.Container) error
// GetActuatedResources returns the stored actuated resources for the container, and whether they exist.
GetActuatedResources(podUID types.UID, containerName string) (v1.ResourceRequirements, bool)
// DeletePod removes any stored state for the given pod UID.
DeletePod(uid types.UID)
// RemoveOrphanedPods removes the stored state for any pods not included in the set of remaining pods. // RemoveOrphanedPods removes the stored state for any pods not included in the set of remaining pods.
RemoveOrphanedPods(remainingPods sets.Set[types.UID]) RemoveOrphanedPods(remainingPods sets.Set[types.UID])
} }
type manager struct { type manager struct {
state state.State allocated state.State
actuated state.State
} }
func NewManager(checkpointDirectory string) Manager { func NewManager(checkpointDirectory string) Manager {
m := &manager{} return &manager{
allocated: newStateImpl(checkpointDirectory, allocatedPodsStateFile),
actuated: newStateImpl(checkpointDirectory, actuatedPodsStateFile),
}
}
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { func newStateImpl(checkpointDirectory, checkpointName string) state.State {
stateImpl, err := state.NewStateCheckpoint(checkpointDirectory, podStatusManagerStateFile) if !utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
return state.NewNoopStateCheckpoint()
}
stateImpl, err := state.NewStateCheckpoint(checkpointDirectory, checkpointName)
if err != nil { if err != nil {
// This is a crictical, non-recoverable failure. // This is a critical, non-recoverable failure.
klog.ErrorS(err, "Failed to initialize allocation checkpoint manager") klog.ErrorS(err, "Failed to initialize allocation checkpoint manager",
"checkpointPath", filepath.Join(checkpointDirectory, checkpointName))
panic(err) panic(err)
} }
m.state = stateImpl
} else {
m.state = state.NewNoopStateCheckpoint()
}
return m return stateImpl
} }
// NewInMemoryManager returns an allocation manager that doesn't persist state. // NewInMemoryManager returns an allocation manager that doesn't persist state.
// For testing purposes only! // For testing purposes only!
func NewInMemoryManager() Manager { func NewInMemoryManager() Manager {
return &manager{ return &manager{
state: state.NewStateMemory(nil), allocated: state.NewStateMemory(nil),
actuated: state.NewStateMemory(nil),
} }
} }
// GetContainerResourceAllocation returns the last checkpointed AllocatedResources values // GetContainerResourceAllocation returns the last checkpointed AllocatedResources values
// If checkpoint manager has not been initialized, it returns nil, false // If checkpoint manager has not been initialized, it returns nil, false
func (m *manager) GetContainerResourceAllocation(podUID types.UID, containerName string) (v1.ResourceRequirements, bool) { func (m *manager) GetContainerResourceAllocation(podUID types.UID, containerName string) (v1.ResourceRequirements, bool) {
return m.state.GetContainerResourceAllocation(podUID, containerName) return m.allocated.GetContainerResourceAllocation(podUID, containerName)
} }
// UpdatePodFromAllocation overwrites the pod spec with the allocation. // UpdatePodFromAllocation overwrites the pod spec with the allocation.
// This function does a deep copy only if updates are needed. // This function does a deep copy only if updates are needed.
func (m *manager) UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) { func (m *manager) UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) {
// TODO(tallclair): This clones the whole cache, but we only need 1 pod. // TODO(tallclair): This clones the whole cache, but we only need 1 pod.
allocs := m.state.GetPodResourceAllocation() allocs := m.allocated.GetPodResourceAllocation()
return updatePodFromAllocation(pod, allocs) return updatePodFromAllocation(pod, allocs)
} }
@ -134,6 +153,10 @@ func updatePodFromAllocation(pod *v1.Pod, allocs state.PodResourceAllocation) (*
// SetPodAllocation checkpoints the resources allocated to a pod's containers // SetPodAllocation checkpoints the resources allocated to a pod's containers
func (m *manager) SetPodAllocation(pod *v1.Pod) error { func (m *manager) SetPodAllocation(pod *v1.Pod) error {
return m.allocated.SetPodResourceAllocation(pod.UID, allocationFromPod(pod))
}
func allocationFromPod(pod *v1.Pod) map[string]v1.ResourceRequirements {
podAlloc := make(map[string]v1.ResourceRequirements) podAlloc := make(map[string]v1.ResourceRequirements)
for _, container := range pod.Spec.Containers { for _, container := range pod.Spec.Containers {
alloc := *container.Resources.DeepCopy() alloc := *container.Resources.DeepCopy()
@ -149,16 +172,35 @@ func (m *manager) SetPodAllocation(pod *v1.Pod) error {
} }
} }
return m.state.SetPodResourceAllocation(pod.UID, podAlloc) return podAlloc
} }
func (m *manager) DeletePodAllocation(uid types.UID) { func (m *manager) DeletePod(uid types.UID) {
if err := m.state.Delete(uid, ""); err != nil { if err := m.allocated.Delete(uid, ""); err != nil {
// If the deletion fails, it will be retried by RemoveOrphanedPods, so we can safely ignore the error.
klog.V(3).ErrorS(err, "Failed to delete pod allocation", "podUID", uid)
}
if err := m.actuated.Delete(uid, ""); err != nil {
// If the deletion fails, it will be retried by RemoveOrphanedPods, so we can safely ignore the error. // If the deletion fails, it will be retried by RemoveOrphanedPods, so we can safely ignore the error.
klog.V(3).ErrorS(err, "Failed to delete pod allocation", "podUID", uid) klog.V(3).ErrorS(err, "Failed to delete pod allocation", "podUID", uid)
} }
} }
func (m *manager) RemoveOrphanedPods(remainingPods sets.Set[types.UID]) { func (m *manager) RemoveOrphanedPods(remainingPods sets.Set[types.UID]) {
m.state.RemoveOrphanedPods(remainingPods) m.allocated.RemoveOrphanedPods(remainingPods)
m.actuated.RemoveOrphanedPods(remainingPods)
}
func (m *manager) SetActuatedResources(allocatedPod *v1.Pod, actuatedContainer *v1.Container) error {
if actuatedContainer == nil {
alloc := allocationFromPod(allocatedPod)
return m.actuated.SetPodResourceAllocation(allocatedPod.UID, alloc)
}
return m.actuated.SetContainerResourceAllocation(allocatedPod.UID, actuatedContainer.Name, actuatedContainer.Resources)
}
func (m *manager) GetActuatedResources(podUID types.UID, containerName string) (v1.ResourceRequirements, bool) {
return m.actuated.GetContainerResourceAllocation(podUID, containerName)
} }

View File

@ -2722,7 +2722,7 @@ func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
start := kl.clock.Now() start := kl.clock.Now()
for _, pod := range pods { for _, pod := range pods {
kl.podManager.RemovePod(pod) kl.podManager.RemovePod(pod)
kl.allocationManager.DeletePodAllocation(pod.UID) kl.allocationManager.DeletePod(pod.UID)
pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod) pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)
if wasMirror { if wasMirror {