mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-17 15:50:10 +00:00
Extract pod allocation manager from status manager
This commit is contained in:
parent
81e54a2a01
commit
84ec78ede7
160
pkg/kubelet/allocation/allocation_manager.go
Normal file
160
pkg/kubelet/allocation/allocation_manager.go
Normal file
@ -0,0 +1,160 @@
|
||||
/*
|
||||
Copyright 2025 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package allocation
|
||||
|
||||
import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/klog/v2"
|
||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/kubelet/allocation/state"
|
||||
)
|
||||
|
||||
// podStatusManagerStateFile is the file name where status manager stores its state
|
||||
const podStatusManagerStateFile = "pod_status_manager_state"
|
||||
|
||||
// AllocationManager tracks pod resource allocations.
|
||||
type Manager interface {
|
||||
// GetContainerResourceAllocation returns the AllocatedResources value for the container
|
||||
GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool)
|
||||
|
||||
// UpdatePodFromAllocation overwrites the pod spec with the allocation.
|
||||
// This function does a deep copy only if updates are needed.
|
||||
// Returns the updated (or original) pod, and whether there was an allocation stored.
|
||||
UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool)
|
||||
|
||||
// SetPodAllocation checkpoints the resources allocated to a pod's containers.
|
||||
SetPodAllocation(pod *v1.Pod) error
|
||||
|
||||
// DeletePodAllocation removes any stored state for the given pod UID.
|
||||
DeletePodAllocation(uid types.UID) error
|
||||
|
||||
// RemoveOrphanedPods removes the stored state for any pods not included in the list of remaining pod UIDs.
|
||||
RemoveOrphanedPods(remainingPods map[types.UID]bool)
|
||||
}
|
||||
|
||||
type manager struct {
|
||||
state state.State
|
||||
}
|
||||
|
||||
func NewManager(checkpointDirectory string) Manager {
|
||||
m := &manager{}
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
|
||||
stateImpl, err := state.NewStateCheckpoint(checkpointDirectory, podStatusManagerStateFile)
|
||||
if err != nil {
|
||||
// This is a crictical, non-recoverable failure.
|
||||
klog.ErrorS(err, "Could not initialize pod allocation checkpoint manager, please drain node and remove policy state file")
|
||||
panic(err)
|
||||
}
|
||||
m.state = stateImpl
|
||||
} else {
|
||||
m.state = state.NewNoopStateCheckpoint()
|
||||
}
|
||||
|
||||
return m
|
||||
}
|
||||
|
||||
// NewInMemoryManager returns an allocation manager that doesn't persist state.
|
||||
// For testing purposes only!
|
||||
func NewInMemoryManager() Manager {
|
||||
return &manager{
|
||||
state: state.NewStateMemory(nil),
|
||||
}
|
||||
}
|
||||
|
||||
// GetContainerResourceAllocation returns the last checkpointed AllocatedResources values
|
||||
// If checkpoint manager has not been initialized, it returns nil, false
|
||||
func (m *manager) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool) {
|
||||
return m.state.GetContainerResourceAllocation(podUID, containerName)
|
||||
}
|
||||
|
||||
// UpdatePodFromAllocation overwrites the pod spec with the allocation.
|
||||
// This function does a deep copy only if updates are needed.
|
||||
func (m *manager) UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) {
|
||||
// TODO(tallclair): This clones the whole cache, but we only need 1 pod.
|
||||
allocs := m.state.GetPodResourceAllocation()
|
||||
return updatePodFromAllocation(pod, allocs)
|
||||
}
|
||||
|
||||
func updatePodFromAllocation(pod *v1.Pod, allocs state.PodResourceAllocation) (*v1.Pod, bool) {
|
||||
allocated, found := allocs[string(pod.UID)]
|
||||
if !found {
|
||||
return pod, false
|
||||
}
|
||||
|
||||
updated := false
|
||||
containerAlloc := func(c v1.Container) (v1.ResourceRequirements, bool) {
|
||||
if cAlloc, ok := allocated[c.Name]; ok {
|
||||
if !apiequality.Semantic.DeepEqual(c.Resources, cAlloc) {
|
||||
// Allocation differs from pod spec, retrieve the allocation
|
||||
if !updated {
|
||||
// If this is the first update to be performed, copy the pod
|
||||
pod = pod.DeepCopy()
|
||||
updated = true
|
||||
}
|
||||
return cAlloc, true
|
||||
}
|
||||
}
|
||||
return v1.ResourceRequirements{}, false
|
||||
}
|
||||
|
||||
for i, c := range pod.Spec.Containers {
|
||||
if cAlloc, found := containerAlloc(c); found {
|
||||
// Allocation differs from pod spec, update
|
||||
pod.Spec.Containers[i].Resources = cAlloc
|
||||
}
|
||||
}
|
||||
for i, c := range pod.Spec.InitContainers {
|
||||
if cAlloc, found := containerAlloc(c); found {
|
||||
// Allocation differs from pod spec, update
|
||||
pod.Spec.InitContainers[i].Resources = cAlloc
|
||||
}
|
||||
}
|
||||
return pod, updated
|
||||
}
|
||||
|
||||
// SetPodAllocation checkpoints the resources allocated to a pod's containers
|
||||
func (m *manager) SetPodAllocation(pod *v1.Pod) error {
|
||||
podAlloc := make(map[string]v1.ResourceRequirements)
|
||||
for _, container := range pod.Spec.Containers {
|
||||
alloc := *container.Resources.DeepCopy()
|
||||
podAlloc[container.Name] = alloc
|
||||
}
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.SidecarContainers) {
|
||||
for _, container := range pod.Spec.InitContainers {
|
||||
if podutil.IsRestartableInitContainer(&container) {
|
||||
alloc := *container.Resources.DeepCopy()
|
||||
podAlloc[container.Name] = alloc
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return m.state.SetPodResourceAllocation(string(pod.UID), podAlloc)
|
||||
}
|
||||
|
||||
func (m *manager) DeletePodAllocation(uid types.UID) error {
|
||||
return m.state.Delete(string(uid), "")
|
||||
}
|
||||
|
||||
func (m *manager) RemoveOrphanedPods(remainingPods map[types.UID]bool) {
|
||||
m.state.RemoveOrphanedPods(remainingPods)
|
||||
}
|
165
pkg/kubelet/allocation/allocation_manager_test.go
Normal file
165
pkg/kubelet/allocation/allocation_manager_test.go
Normal file
@ -0,0 +1,165 @@
|
||||
/*
|
||||
Copyright 2025 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package allocation
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/kubernetes/pkg/kubelet/allocation/state"
|
||||
)
|
||||
|
||||
func TestUpdatePodFromAllocation(t *testing.T) {
|
||||
containerRestartPolicyAlways := v1.ContainerRestartPolicyAlways
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
UID: "12345",
|
||||
Name: "test",
|
||||
Namespace: "default",
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: "c1",
|
||||
Resources: v1.ResourceRequirements{
|
||||
Requests: v1.ResourceList{
|
||||
v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI),
|
||||
v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI),
|
||||
},
|
||||
Limits: v1.ResourceList{
|
||||
v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI),
|
||||
v1.ResourceMemory: *resource.NewQuantity(400, resource.DecimalSI),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "c2",
|
||||
Resources: v1.ResourceRequirements{
|
||||
Requests: v1.ResourceList{
|
||||
v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI),
|
||||
v1.ResourceMemory: *resource.NewQuantity(600, resource.DecimalSI),
|
||||
},
|
||||
Limits: v1.ResourceList{
|
||||
v1.ResourceCPU: *resource.NewMilliQuantity(700, resource.DecimalSI),
|
||||
v1.ResourceMemory: *resource.NewQuantity(800, resource.DecimalSI),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
InitContainers: []v1.Container{
|
||||
{
|
||||
Name: "c1-restartable-init",
|
||||
Resources: v1.ResourceRequirements{
|
||||
Requests: v1.ResourceList{
|
||||
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
|
||||
v1.ResourceMemory: *resource.NewQuantity(300, resource.DecimalSI),
|
||||
},
|
||||
Limits: v1.ResourceList{
|
||||
v1.ResourceCPU: *resource.NewMilliQuantity(400, resource.DecimalSI),
|
||||
v1.ResourceMemory: *resource.NewQuantity(500, resource.DecimalSI),
|
||||
},
|
||||
},
|
||||
RestartPolicy: &containerRestartPolicyAlways,
|
||||
},
|
||||
{
|
||||
Name: "c1-init",
|
||||
Resources: v1.ResourceRequirements{
|
||||
Requests: v1.ResourceList{
|
||||
v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI),
|
||||
v1.ResourceMemory: *resource.NewQuantity(600, resource.DecimalSI),
|
||||
},
|
||||
Limits: v1.ResourceList{
|
||||
v1.ResourceCPU: *resource.NewMilliQuantity(700, resource.DecimalSI),
|
||||
v1.ResourceMemory: *resource.NewQuantity(800, resource.DecimalSI),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
resizedPod := pod.DeepCopy()
|
||||
resizedPod.Spec.Containers[0].Resources.Requests[v1.ResourceCPU] = *resource.NewMilliQuantity(200, resource.DecimalSI)
|
||||
resizedPod.Spec.InitContainers[0].Resources.Requests[v1.ResourceCPU] = *resource.NewMilliQuantity(300, resource.DecimalSI)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
pod *v1.Pod
|
||||
allocs state.PodResourceAllocation
|
||||
expectPod *v1.Pod
|
||||
expectUpdate bool
|
||||
}{{
|
||||
name: "steady state",
|
||||
pod: pod,
|
||||
allocs: state.PodResourceAllocation{
|
||||
string(pod.UID): map[string]v1.ResourceRequirements{
|
||||
"c1": *pod.Spec.Containers[0].Resources.DeepCopy(),
|
||||
"c2": *pod.Spec.Containers[1].Resources.DeepCopy(),
|
||||
"c1-restartable-init": *pod.Spec.InitContainers[0].Resources.DeepCopy(),
|
||||
"c1-init": *pod.Spec.InitContainers[1].Resources.DeepCopy(),
|
||||
},
|
||||
},
|
||||
expectUpdate: false,
|
||||
}, {
|
||||
name: "no allocations",
|
||||
pod: pod,
|
||||
allocs: state.PodResourceAllocation{},
|
||||
expectUpdate: false,
|
||||
}, {
|
||||
name: "missing container allocation",
|
||||
pod: pod,
|
||||
allocs: state.PodResourceAllocation{
|
||||
string(pod.UID): map[string]v1.ResourceRequirements{
|
||||
"c2": *pod.Spec.Containers[1].Resources.DeepCopy(),
|
||||
},
|
||||
},
|
||||
expectUpdate: false,
|
||||
}, {
|
||||
name: "resized container",
|
||||
pod: pod,
|
||||
allocs: state.PodResourceAllocation{
|
||||
string(pod.UID): map[string]v1.ResourceRequirements{
|
||||
"c1": *resizedPod.Spec.Containers[0].Resources.DeepCopy(),
|
||||
"c2": *resizedPod.Spec.Containers[1].Resources.DeepCopy(),
|
||||
"c1-restartable-init": *resizedPod.Spec.InitContainers[0].Resources.DeepCopy(),
|
||||
"c1-init": *resizedPod.Spec.InitContainers[1].Resources.DeepCopy(),
|
||||
},
|
||||
},
|
||||
expectUpdate: true,
|
||||
expectPod: resizedPod,
|
||||
}}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
pod := test.pod.DeepCopy()
|
||||
allocatedPod, updated := updatePodFromAllocation(pod, test.allocs)
|
||||
|
||||
if test.expectUpdate {
|
||||
assert.True(t, updated, "updated")
|
||||
assert.Equal(t, test.expectPod, allocatedPod)
|
||||
assert.NotEqual(t, pod, allocatedPod)
|
||||
} else {
|
||||
assert.False(t, updated, "updated")
|
||||
assert.Same(t, pod, allocatedPod)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
18
pkg/kubelet/allocation/doc.go
Normal file
18
pkg/kubelet/allocation/doc.go
Normal file
@ -0,0 +1,18 @@
|
||||
/*
|
||||
Copyright 2025 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package allocation handles tracking pod resource allocations.
|
||||
package allocation
|
@ -18,6 +18,7 @@ package state
|
||||
|
||||
import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
)
|
||||
|
||||
// PodResourceAllocation type is used in tracking resources allocated to pod's containers
|
||||
@ -48,6 +49,8 @@ type writer interface {
|
||||
SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceRequirements) error
|
||||
SetPodResourceAllocation(podUID string, alloc map[string]v1.ResourceRequirements) error
|
||||
Delete(podUID string, containerName string) error
|
||||
// RemoveOrphanedPods removes the stored state for any pods not included in the list of remaining pod UIDs.
|
||||
RemoveOrphanedPods(remainingPods map[types.UID]bool)
|
||||
}
|
||||
|
||||
// State interface provides methods for tracking and setting pod resource allocation
|
||||
|
@ -22,6 +22,7 @@ import (
|
||||
"sync"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
|
||||
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
|
||||
@ -139,6 +140,12 @@ func (sc *stateCheckpoint) Delete(podUID string, containerName string) error {
|
||||
return sc.storeState()
|
||||
}
|
||||
|
||||
func (sc *stateCheckpoint) RemoveOrphanedPods(remainingPods map[types.UID]bool) {
|
||||
sc.cache.RemoveOrphanedPods(remainingPods)
|
||||
// Don't bother updating the stored state. If Kubelet is restarted before the cache is written,
|
||||
// the orphaned pods will be removed the next time this method is called.
|
||||
}
|
||||
|
||||
type noopStateCheckpoint struct{}
|
||||
|
||||
// NewNoopStateCheckpoint creates a dummy state checkpoint manager
|
||||
@ -165,3 +172,5 @@ func (sc *noopStateCheckpoint) SetPodResourceAllocation(_ string, _ map[string]v
|
||||
func (sc *noopStateCheckpoint) Delete(_ string, _ string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sc *noopStateCheckpoint) RemoveOrphanedPods(_ map[types.UID]bool) {}
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"sync"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
@ -96,3 +97,14 @@ func (s *stateMemory) Delete(podUID string, containerName string) error {
|
||||
s.deleteContainer(podUID, containerName)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *stateMemory) RemoveOrphanedPods(remainingPods map[types.UID]bool) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
for podUID := range s.podAllocation {
|
||||
if _, ok := remainingPods[types.UID(podUID)]; !ok {
|
||||
delete(s.podAllocation, podUID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -62,6 +62,7 @@ import (
|
||||
fakeremote "k8s.io/cri-client/pkg/fake"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/kubelet/allocation/state"
|
||||
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
|
||||
cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing"
|
||||
"k8s.io/kubernetes/pkg/kubelet/clustertrustbundle"
|
||||
@ -90,7 +91,6 @@ import (
|
||||
serverstats "k8s.io/kubernetes/pkg/kubelet/server/stats"
|
||||
"k8s.io/kubernetes/pkg/kubelet/stats"
|
||||
"k8s.io/kubernetes/pkg/kubelet/status"
|
||||
"k8s.io/kubernetes/pkg/kubelet/status/state"
|
||||
statustest "k8s.io/kubernetes/pkg/kubelet/status/testing"
|
||||
"k8s.io/kubernetes/pkg/kubelet/sysctl"
|
||||
"k8s.io/kubernetes/pkg/kubelet/token"
|
||||
|
@ -19,17 +19,14 @@ package status
|
||||
import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/klog/v2"
|
||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/kubelet/allocation/state"
|
||||
"k8s.io/kubernetes/pkg/kubelet/allocation"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
)
|
||||
|
||||
type fakeManager struct {
|
||||
state state.State
|
||||
podResizeStatuses map[types.UID]v1.PodResizeStatus
|
||||
allocation.Manager
|
||||
}
|
||||
|
||||
func (m *fakeManager) Start() {
|
||||
@ -67,41 +64,10 @@ func (m *fakeManager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) {
|
||||
return
|
||||
}
|
||||
|
||||
func (m *fakeManager) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool) {
|
||||
klog.InfoS("GetContainerResourceAllocation()")
|
||||
return m.state.GetContainerResourceAllocation(podUID, containerName)
|
||||
}
|
||||
|
||||
func (m *fakeManager) GetPodResizeStatus(podUID types.UID) v1.PodResizeStatus {
|
||||
return m.podResizeStatuses[podUID]
|
||||
}
|
||||
|
||||
func (m *fakeManager) UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) {
|
||||
allocs := m.state.GetPodResourceAllocation()
|
||||
return updatePodFromAllocation(pod, allocs)
|
||||
}
|
||||
|
||||
func (m *fakeManager) SetPodAllocation(pod *v1.Pod) error {
|
||||
klog.InfoS("SetPodAllocation()")
|
||||
for _, container := range pod.Spec.Containers {
|
||||
alloc := *container.Resources.DeepCopy()
|
||||
if err := m.state.SetContainerResourceAllocation(string(pod.UID), container.Name, alloc); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.SidecarContainers) {
|
||||
for _, container := range pod.Spec.InitContainers {
|
||||
if podutil.IsRestartableInitContainer(&container) {
|
||||
alloc := *container.Resources.DeepCopy()
|
||||
if err := m.state.SetContainerResourceAllocation(string(pod.UID), container.Name, alloc); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *fakeManager) SetPodResizeStatus(podUID types.UID, resizeStatus v1.PodResizeStatus) {
|
||||
m.podResizeStatuses[podUID] = resizeStatus
|
||||
}
|
||||
@ -109,7 +75,7 @@ func (m *fakeManager) SetPodResizeStatus(podUID types.UID, resizeStatus v1.PodRe
|
||||
// NewFakeManager creates empty/fake memory manager
|
||||
func NewFakeManager() Manager {
|
||||
return &fakeManager{
|
||||
state: state.NewStateMemory(state.PodResourceAllocation{}),
|
||||
Manager: allocation.NewInMemoryManager(),
|
||||
podResizeStatuses: make(map[types.UID]v1.PodResizeStatus),
|
||||
}
|
||||
}
|
||||
|
@ -38,7 +38,7 @@ import (
|
||||
"k8s.io/klog/v2"
|
||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/kubelet/allocation/state"
|
||||
"k8s.io/kubernetes/pkg/kubelet/allocation"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/metrics"
|
||||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||
@ -46,9 +46,6 @@ import (
|
||||
statusutil "k8s.io/kubernetes/pkg/util/pod"
|
||||
)
|
||||
|
||||
// podStatusManagerStateFile is the file name where status manager stores its state
|
||||
const podStatusManagerStateFile = "pod_status_manager_state"
|
||||
|
||||
// A wrapper around v1.PodStatus that includes a version to enforce that stale pod statuses are
|
||||
// not sent to the API server.
|
||||
type versionedPodStatus struct {
|
||||
@ -82,10 +79,8 @@ type manager struct {
|
||||
podDeletionSafety PodDeletionSafetyProvider
|
||||
|
||||
podStartupLatencyHelper PodStartupLatencyStateHelper
|
||||
// state allows to save/restore pod resource allocation and tolerate kubelet restarts.
|
||||
state state.State
|
||||
// stateFileDirectory holds the directory where the state file for checkpoints is held.
|
||||
stateFileDirectory string
|
||||
|
||||
allocation.Manager
|
||||
}
|
||||
|
||||
// PodManager is the subset of methods the manager needs to observe the actual state of the kubelet.
|
||||
@ -180,7 +175,7 @@ func NewManager(kubeClient clientset.Interface, podManager PodManager, podDeleti
|
||||
apiStatusVersions: make(map[kubetypes.MirrorPodUID]uint64),
|
||||
podDeletionSafety: podDeletionSafety,
|
||||
podStartupLatencyHelper: podStartupLatencyHelper,
|
||||
stateFileDirectory: stateFileDirectory,
|
||||
Manager: allocation.NewManager(stateFileDirectory),
|
||||
}
|
||||
}
|
||||
|
||||
@ -204,20 +199,6 @@ func isPodStatusByKubeletEqual(oldStatus, status *v1.PodStatus) bool {
|
||||
}
|
||||
|
||||
func (m *manager) Start() {
|
||||
// Initialize m.state to no-op state checkpoint manager
|
||||
m.state = state.NewNoopStateCheckpoint()
|
||||
|
||||
// Create pod allocation checkpoint manager even if client is nil so as to allow local get/set of AllocatedResources & Resize
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
|
||||
stateImpl, err := state.NewStateCheckpoint(m.stateFileDirectory, podStatusManagerStateFile)
|
||||
if err != nil {
|
||||
// This is a crictical, non-recoverable failure.
|
||||
klog.ErrorS(err, "Could not initialize pod allocation checkpoint manager, please drain node and remove policy state file")
|
||||
panic(err)
|
||||
}
|
||||
m.state = stateImpl
|
||||
}
|
||||
|
||||
// Don't start the status manager if we don't have a client. This will happen
|
||||
// on the master, where the kubelet is responsible for bootstrapping the pods
|
||||
// of the master components.
|
||||
@ -246,61 +227,6 @@ func (m *manager) Start() {
|
||||
}, 0)
|
||||
}
|
||||
|
||||
// GetContainerResourceAllocation returns the last checkpointed AllocatedResources values
|
||||
// If checkpoint manager has not been initialized, it returns nil, false
|
||||
func (m *manager) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool) {
|
||||
m.podStatusesLock.RLock()
|
||||
defer m.podStatusesLock.RUnlock()
|
||||
return m.state.GetContainerResourceAllocation(podUID, containerName)
|
||||
}
|
||||
|
||||
// UpdatePodFromAllocation overwrites the pod spec with the allocation.
|
||||
// This function does a deep copy only if updates are needed.
|
||||
func (m *manager) UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) {
|
||||
m.podStatusesLock.RLock()
|
||||
defer m.podStatusesLock.RUnlock()
|
||||
// TODO(tallclair): This clones the whole cache, but we only need 1 pod.
|
||||
allocs := m.state.GetPodResourceAllocation()
|
||||
return updatePodFromAllocation(pod, allocs)
|
||||
}
|
||||
|
||||
func updatePodFromAllocation(pod *v1.Pod, allocs state.PodResourceAllocation) (*v1.Pod, bool) {
|
||||
allocated, found := allocs[string(pod.UID)]
|
||||
if !found {
|
||||
return pod, false
|
||||
}
|
||||
|
||||
updated := false
|
||||
containerAlloc := func(c v1.Container) (v1.ResourceRequirements, bool) {
|
||||
if cAlloc, ok := allocated[c.Name]; ok {
|
||||
if !apiequality.Semantic.DeepEqual(c.Resources, cAlloc) {
|
||||
// Allocation differs from pod spec, retrieve the allocation
|
||||
if !updated {
|
||||
// If this is the first update to be performed, copy the pod
|
||||
pod = pod.DeepCopy()
|
||||
updated = true
|
||||
}
|
||||
return cAlloc, true
|
||||
}
|
||||
}
|
||||
return v1.ResourceRequirements{}, false
|
||||
}
|
||||
|
||||
for i, c := range pod.Spec.Containers {
|
||||
if cAlloc, found := containerAlloc(c); found {
|
||||
// Allocation differs from pod spec, update
|
||||
pod.Spec.Containers[i].Resources = cAlloc
|
||||
}
|
||||
}
|
||||
for i, c := range pod.Spec.InitContainers {
|
||||
if cAlloc, found := containerAlloc(c); found {
|
||||
// Allocation differs from pod spec, update
|
||||
pod.Spec.InitContainers[i].Resources = cAlloc
|
||||
}
|
||||
}
|
||||
return pod, updated
|
||||
}
|
||||
|
||||
// GetPodResizeStatus returns the last cached ResizeStatus value.
|
||||
func (m *manager) GetPodResizeStatus(podUID types.UID) v1.PodResizeStatus {
|
||||
m.podStatusesLock.RLock()
|
||||
@ -308,29 +234,6 @@ func (m *manager) GetPodResizeStatus(podUID types.UID) v1.PodResizeStatus {
|
||||
return m.podResizeStatuses[podUID]
|
||||
}
|
||||
|
||||
// SetPodAllocation checkpoints the resources allocated to a pod's containers
|
||||
func (m *manager) SetPodAllocation(pod *v1.Pod) error {
|
||||
m.podStatusesLock.RLock()
|
||||
defer m.podStatusesLock.RUnlock()
|
||||
|
||||
podAlloc := make(map[string]v1.ResourceRequirements)
|
||||
for _, container := range pod.Spec.Containers {
|
||||
alloc := *container.Resources.DeepCopy()
|
||||
podAlloc[container.Name] = alloc
|
||||
}
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.SidecarContainers) {
|
||||
for _, container := range pod.Spec.InitContainers {
|
||||
if podutil.IsRestartableInitContainer(&container) {
|
||||
alloc := *container.Resources.DeepCopy()
|
||||
podAlloc[container.Name] = alloc
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return m.state.SetPodResourceAllocation(string(pod.UID), podAlloc)
|
||||
}
|
||||
|
||||
// SetPodResizeStatus checkpoints the last resizing decision for the pod.
|
||||
func (m *manager) SetPodResizeStatus(podUID types.UID, resizeStatus v1.PodResizeStatus) {
|
||||
m.podStatusesLock.Lock()
|
||||
@ -806,7 +709,7 @@ func (m *manager) deletePodStatus(uid types.UID) {
|
||||
m.podStartupLatencyHelper.DeletePodStartupState(uid)
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
|
||||
delete(m.podResizeStatuses, uid)
|
||||
m.state.Delete(string(uid), "")
|
||||
m.Manager.DeletePodAllocation(uid)
|
||||
}
|
||||
}
|
||||
|
||||
@ -820,10 +723,12 @@ func (m *manager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) {
|
||||
delete(m.podStatuses, key)
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
|
||||
delete(m.podResizeStatuses, key)
|
||||
m.state.Delete(string(key), "")
|
||||
}
|
||||
}
|
||||
}
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
|
||||
m.Manager.RemoveOrphanedPods(podUIDs)
|
||||
}
|
||||
}
|
||||
|
||||
// syncBatch syncs pods statuses with the apiserver. Returns the number of syncs
|
||||
|
@ -32,7 +32,6 @@ import (
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
@ -41,7 +40,6 @@ import (
|
||||
core "k8s.io/client-go/testing"
|
||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||
api "k8s.io/kubernetes/pkg/apis/core"
|
||||
"k8s.io/kubernetes/pkg/kubelet/allocation/state"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
|
||||
statustest "k8s.io/kubernetes/pkg/kubelet/status/testing"
|
||||
@ -2036,143 +2034,6 @@ func TestMergePodStatus(t *testing.T) {
|
||||
|
||||
}
|
||||
|
||||
func TestUpdatePodFromAllocation(t *testing.T) {
|
||||
containerRestartPolicyAlways := v1.ContainerRestartPolicyAlways
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
UID: "12345",
|
||||
Name: "test",
|
||||
Namespace: "default",
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: "c1",
|
||||
Resources: v1.ResourceRequirements{
|
||||
Requests: v1.ResourceList{
|
||||
v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI),
|
||||
v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI),
|
||||
},
|
||||
Limits: v1.ResourceList{
|
||||
v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI),
|
||||
v1.ResourceMemory: *resource.NewQuantity(400, resource.DecimalSI),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "c2",
|
||||
Resources: v1.ResourceRequirements{
|
||||
Requests: v1.ResourceList{
|
||||
v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI),
|
||||
v1.ResourceMemory: *resource.NewQuantity(600, resource.DecimalSI),
|
||||
},
|
||||
Limits: v1.ResourceList{
|
||||
v1.ResourceCPU: *resource.NewMilliQuantity(700, resource.DecimalSI),
|
||||
v1.ResourceMemory: *resource.NewQuantity(800, resource.DecimalSI),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
InitContainers: []v1.Container{
|
||||
{
|
||||
Name: "c1-restartable-init",
|
||||
Resources: v1.ResourceRequirements{
|
||||
Requests: v1.ResourceList{
|
||||
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
|
||||
v1.ResourceMemory: *resource.NewQuantity(300, resource.DecimalSI),
|
||||
},
|
||||
Limits: v1.ResourceList{
|
||||
v1.ResourceCPU: *resource.NewMilliQuantity(400, resource.DecimalSI),
|
||||
v1.ResourceMemory: *resource.NewQuantity(500, resource.DecimalSI),
|
||||
},
|
||||
},
|
||||
RestartPolicy: &containerRestartPolicyAlways,
|
||||
},
|
||||
{
|
||||
Name: "c1-init",
|
||||
Resources: v1.ResourceRequirements{
|
||||
Requests: v1.ResourceList{
|
||||
v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI),
|
||||
v1.ResourceMemory: *resource.NewQuantity(600, resource.DecimalSI),
|
||||
},
|
||||
Limits: v1.ResourceList{
|
||||
v1.ResourceCPU: *resource.NewMilliQuantity(700, resource.DecimalSI),
|
||||
v1.ResourceMemory: *resource.NewQuantity(800, resource.DecimalSI),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
resizedPod := pod.DeepCopy()
|
||||
resizedPod.Spec.Containers[0].Resources.Requests[v1.ResourceCPU] = *resource.NewMilliQuantity(200, resource.DecimalSI)
|
||||
resizedPod.Spec.InitContainers[0].Resources.Requests[v1.ResourceCPU] = *resource.NewMilliQuantity(300, resource.DecimalSI)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
pod *v1.Pod
|
||||
allocs state.PodResourceAllocation
|
||||
expectPod *v1.Pod
|
||||
expectUpdate bool
|
||||
}{{
|
||||
name: "steady state",
|
||||
pod: pod,
|
||||
allocs: state.PodResourceAllocation{
|
||||
string(pod.UID): map[string]v1.ResourceRequirements{
|
||||
"c1": *pod.Spec.Containers[0].Resources.DeepCopy(),
|
||||
"c2": *pod.Spec.Containers[1].Resources.DeepCopy(),
|
||||
"c1-restartable-init": *pod.Spec.InitContainers[0].Resources.DeepCopy(),
|
||||
"c1-init": *pod.Spec.InitContainers[1].Resources.DeepCopy(),
|
||||
},
|
||||
},
|
||||
expectUpdate: false,
|
||||
}, {
|
||||
name: "no allocations",
|
||||
pod: pod,
|
||||
allocs: state.PodResourceAllocation{},
|
||||
expectUpdate: false,
|
||||
}, {
|
||||
name: "missing container allocation",
|
||||
pod: pod,
|
||||
allocs: state.PodResourceAllocation{
|
||||
string(pod.UID): map[string]v1.ResourceRequirements{
|
||||
"c2": *pod.Spec.Containers[1].Resources.DeepCopy(),
|
||||
},
|
||||
},
|
||||
expectUpdate: false,
|
||||
}, {
|
||||
name: "resized container",
|
||||
pod: pod,
|
||||
allocs: state.PodResourceAllocation{
|
||||
string(pod.UID): map[string]v1.ResourceRequirements{
|
||||
"c1": *resizedPod.Spec.Containers[0].Resources.DeepCopy(),
|
||||
"c2": *resizedPod.Spec.Containers[1].Resources.DeepCopy(),
|
||||
"c1-restartable-init": *resizedPod.Spec.InitContainers[0].Resources.DeepCopy(),
|
||||
"c1-init": *resizedPod.Spec.InitContainers[1].Resources.DeepCopy(),
|
||||
},
|
||||
},
|
||||
expectUpdate: true,
|
||||
expectPod: resizedPod,
|
||||
}}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
pod := test.pod.DeepCopy()
|
||||
allocatedPod, updated := updatePodFromAllocation(pod, test.allocs)
|
||||
|
||||
if test.expectUpdate {
|
||||
assert.True(t, updated, "updated")
|
||||
assert.Equal(t, test.expectPod, allocatedPod)
|
||||
assert.NotEqual(t, pod, allocatedPod)
|
||||
} else {
|
||||
assert.False(t, updated, "updated")
|
||||
assert.Same(t, pod, allocatedPod)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func statusEqual(left, right v1.PodStatus) bool {
|
||||
left.Conditions = nil
|
||||
right.Conditions = nil
|
||||
|
Loading…
Reference in New Issue
Block a user