Merge pull request #123977 from wojtek-t/avoid_node_gets

Get node from local cache instead of kube-apiserver cache for kubelet status updates
This commit is contained in:
Kubernetes Prow Robot 2024-04-18 02:10:44 -07:00 committed by GitHub
commit ca1a134a41
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 47 additions and 13 deletions

View File

@ -40,7 +40,6 @@ import (
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/nodestatus"
"k8s.io/kubernetes/pkg/kubelet/util"
taintutil "k8s.io/kubernetes/pkg/util/taints"
volutil "k8s.io/kubernetes/pkg/volume/util"
)
@ -546,15 +545,19 @@ func (kl *Kubelet) updateNodeStatus(ctx context.Context) error {
func (kl *Kubelet) tryUpdateNodeStatus(ctx context.Context, tryNumber int) error {
// In large clusters, GET and PUT operations on Node objects coming
// from here are the majority of load on apiserver and etcd.
// To reduce the load on etcd, we are serving GET operations from
// apiserver cache (the data might be slightly delayed but it doesn't
// To reduce the load on control-plane, we are serving GET operations from
// local lister (the data might be slightly delayed but it doesn't
// seem to cause more conflict - the delays are pretty small).
// If it result in a conflict, all retries are served directly from etcd.
opts := metav1.GetOptions{}
var originalNode *v1.Node
var err error
if tryNumber == 0 {
util.FromApiserverCache(&opts)
originalNode, err = kl.nodeLister.Get(string(kl.nodeName))
} else {
opts := metav1.GetOptions{}
originalNode, err = kl.heartbeatClient.CoreV1().Nodes().Get(ctx, string(kl.nodeName), opts)
}
originalNode, err := kl.heartbeatClient.CoreV1().Nodes().Get(ctx, string(kl.nodeName), opts)
if err != nil {
return fmt.Errorf("error getting node %q: %v", kl.nodeName, err)
}

View File

@ -39,6 +39,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/strategicpatch"
@ -160,6 +161,27 @@ func (lcm *localCM) GetCapacity(localStorageCapacityIsolation bool) v1.ResourceL
return lcm.capacity
}
type delegatingNodeLister struct {
client clientset.Interface
}
func (l delegatingNodeLister) Get(name string) (*v1.Node, error) {
return l.client.CoreV1().Nodes().Get(context.Background(), name, metav1.GetOptions{})
}
func (l delegatingNodeLister) List(selector labels.Selector) (ret []*v1.Node, err error) {
opts := metav1.ListOptions{}
if selector != nil {
opts.LabelSelector = selector.String()
}
nodeList, err := l.client.CoreV1().Nodes().List(context.Background(), opts)
if err != nil {
return nil, err
}
nodes := make([]*v1.Node, len(nodeList.Items))
return nodes, nil
}
func TestUpdateNewNodeStatus(t *testing.T) {
cases := []struct {
desc string
@ -211,6 +233,7 @@ func TestUpdateNewNodeStatus(t *testing.T) {
kubeClient := testKubelet.fakeKubeClient
existingNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}}
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain
kubelet.nodeLister = delegatingNodeLister{client: kubeClient}
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
@ -390,6 +413,7 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
},
}
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain
kubelet.nodeLister = delegatingNodeLister{client: kubeClient}
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
@ -602,6 +626,7 @@ func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) {
kubeClient := testKubelet.fakeKubeClient
existingNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}}
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain
kubelet.nodeLister = delegatingNodeLister{client: kubeClient}
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
@ -824,6 +849,7 @@ func TestUpdateNodeStatusWithLease(t *testing.T) {
kubeClient := testKubelet.fakeKubeClient
existingNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}}
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*existingNode}}).ReactionChain
kubelet.nodeLister = delegatingNodeLister{client: kubeClient}
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
@ -1108,6 +1134,7 @@ func TestUpdateNodeStatusAndVolumesInUseWithNodeLease(t *testing.T) {
kubeClient := testKubelet.fakeKubeClient
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*tc.existingNode}}).ReactionChain
kubelet.nodeLister = delegatingNodeLister{client: kubeClient}
// Execute
assert.NoError(t, kubelet.updateNodeStatus(ctx))
@ -1260,11 +1287,15 @@ func TestFastStatusUpdateOnce(t *testing.T) {
return
}
// patch, get, patch, get, patch, ... up to initial patch + nodeStatusUpdateRetry patches
require.Len(t, actions, 2*tc.wantPatches-1)
// patch, then patch, get, patch, get, patch, ... up to initial patch + nodeStatusUpdateRetry patches
expectedActions := 2*tc.wantPatches - 2
if tc.wantPatches == 1 {
expectedActions = 1
}
require.Len(t, actions, expectedActions)
for i, action := range actions {
if i%2 == 1 {
if i%2 == 0 && i > 0 {
require.IsType(t, core.GetActionImpl{}, action)
continue
}
@ -1566,11 +1597,11 @@ func TestUpdateNewNodeStatusTooLargeReservation(t *testing.T) {
kubelet.updateRuntimeUp()
assert.NoError(t, kubelet.updateNodeStatus(ctx))
actions := kubeClient.Actions()
require.Len(t, actions, 2)
require.True(t, actions[1].Matches("patch", "nodes"))
require.Equal(t, actions[1].GetSubresource(), "status")
require.Len(t, actions, 1)
require.True(t, actions[0].Matches("patch", "nodes"))
require.Equal(t, actions[0].GetSubresource(), "status")
updatedNode, err := applyNodeStatusPatch(&existingNode, actions[1].(core.PatchActionImpl).GetPatch())
updatedNode, err := applyNodeStatusPatch(&existingNode, actions[0].(core.PatchActionImpl).GetPatch())
assert.NoError(t, err)
assert.True(t, apiequality.Semantic.DeepEqual(expectedNode.Status.Allocatable, updatedNode.Status.Allocatable), "%s", cmp.Diff(expectedNode.Status.Allocatable, updatedNode.Status.Allocatable))
}