kube-proxy: merge NodeEligibleHandler with NodeManager

ProxyHealthServer now consumes NodeManager to get the latest
updated node object for determining node eligibility.

Signed-off-by: Daman Arora <aroradaman@gmail.com>
This commit is contained in:
Daman Arora
2025-05-11 22:06:30 +05:30
parent c6735d9b3b
commit fa9e466945
5 changed files with 103 additions and 85 deletions

View File

@@ -241,7 +241,7 @@ func newProxyServer(ctx context.Context, config *kubeproxyconfig.KubeProxyConfig
}
if len(config.HealthzBindAddress) > 0 {
s.HealthzServer = healthcheck.NewProxyHealthServer(config.HealthzBindAddress, 2*config.SyncPeriod.Duration)
s.HealthzServer = healthcheck.NewProxyHealthServer(config.HealthzBindAddress, 2*config.SyncPeriod.Duration, s.NodeManager)
}
err = s.platformSetup(ctx)
@@ -606,9 +606,6 @@ func (s *ProxyServer) Run(ctx context.Context) error {
// hollow-proxy doesn't need node config, and we don't create nodeManager for hollow-proxy.
if s.NodeManager != nil {
nodeConfig := config.NewNodeConfig(ctx, s.NodeManager.NodeInformer(), s.Config.ConfigSyncPeriod.Duration)
nodeConfig.RegisterEventHandler(&proxy.NodeEligibleHandler{
HealthServer: s.HealthzServer,
})
nodeConfig.RegisterEventHandler(s.NodeManager)
nodeTopologyConfig := config.NewNodeTopologyConfig(ctx, s.NodeManager.NodeInformer(), s.Config.ConfigSyncPeriod.Duration)
nodeTopologyConfig.RegisterEventHandler(s.Proxier)

View File

@@ -27,21 +27,24 @@ import (
"time"
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/component-base/metrics/testutil"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/dump"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/utils/ptr"
clientsetfake "k8s.io/client-go/kubernetes/fake"
basemetrics "k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/testutil"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/metrics"
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
testingclock "k8s.io/utils/clock/testing"
"k8s.io/utils/ptr"
)
const testNodeName = "test-node"
type fakeListener struct {
openPorts sets.Set[string]
}
@@ -431,7 +434,16 @@ func tHandler(hcs *server, nsn types.NamespacedName, status int, endpoints int,
type nodeTweak func(n *v1.Node)
func makeNode(tweaks ...nodeTweak) *v1.Node {
n := &v1.Node{}
n := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: testNodeName,
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{{
Type: v1.NodeInternalIP, Address: "192.168.0.1",
}},
},
}
for _, tw := range tweaks {
tw(n)
}
@@ -464,8 +476,10 @@ func TestHealthzServer(t *testing.T) {
listener := newFakeListener()
httpFactory := newFakeHTTPServerFactory()
fakeClock := testingclock.NewFakeClock(time.Now())
client := clientsetfake.NewClientset(makeNode())
hs := newProxyHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second)
nodeManager, _ := proxy.NewNodeManager(context.TODO(), client, time.Second, testNodeName, false)
hs := newProxyHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second, nodeManager)
server := hs.httpFactory.New(healthzHandler{hs: hs})
hsTest := &serverTest{
@@ -481,7 +495,7 @@ func TestHealthzServer(t *testing.T) {
testProxyHealthUpdater(hs, hsTest, fakeClock, ptr.To(true), t)
// Should return 200 "OK" if we've synced a node, tainted in any other way
hs.SyncNode(makeNode(tweakTainted("other")))
nodeManager.OnNodeUpdate(nil, makeNode(tweakTainted("other")))
expectedPayload = ProxyHealth{
CurrentTime: fakeClock.Now(),
LastUpdated: fakeClock.Now(),
@@ -495,7 +509,7 @@ func TestHealthzServer(t *testing.T) {
testHTTPHandler(hsTest, http.StatusOK, expectedPayload, t)
// Should return 503 "ServiceUnavailable" if we've synced a ToBeDeletedTaint node
hs.SyncNode(makeNode(tweakTainted(ToBeDeletedTaint)))
nodeManager.OnNodeUpdate(nil, makeNode(tweakTainted(ToBeDeletedTaint)))
expectedPayload = ProxyHealth{
CurrentTime: fakeClock.Now(),
LastUpdated: fakeClock.Now(),
@@ -509,7 +523,7 @@ func TestHealthzServer(t *testing.T) {
testHTTPHandler(hsTest, http.StatusServiceUnavailable, expectedPayload, t)
// Should return 200 "OK" if we've synced a node, tainted in any other way
hs.SyncNode(makeNode(tweakTainted("other")))
nodeManager.OnNodeUpdate(nil, makeNode(tweakTainted("other")))
expectedPayload = ProxyHealth{
CurrentTime: fakeClock.Now(),
LastUpdated: fakeClock.Now(),
@@ -523,7 +537,7 @@ func TestHealthzServer(t *testing.T) {
testHTTPHandler(hsTest, http.StatusOK, expectedPayload, t)
// Should return 503 "ServiceUnavailable" if we've synced a deleted node
hs.SyncNode(makeNode(tweakDeleted()))
nodeManager.OnNodeUpdate(nil, makeNode(tweakDeleted()))
expectedPayload = ProxyHealth{
CurrentTime: fakeClock.Now(),
LastUpdated: fakeClock.Now(),
@@ -542,8 +556,10 @@ func TestLivezServer(t *testing.T) {
listener := newFakeListener()
httpFactory := newFakeHTTPServerFactory()
fakeClock := testingclock.NewFakeClock(time.Now())
client := clientsetfake.NewClientset(makeNode())
hs := newProxyHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second)
nodeManager, _ := proxy.NewNodeManager(context.TODO(), client, time.Second, testNodeName, false)
hs := newProxyHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second, nodeManager)
server := hs.httpFactory.New(livezHandler{hs: hs})
hsTest := &serverTest{
@@ -559,7 +575,7 @@ func TestLivezServer(t *testing.T) {
testProxyHealthUpdater(hs, hsTest, fakeClock, nil, t)
// Should return 200 "OK" irrespective of node syncs
hs.SyncNode(makeNode(tweakTainted("other")))
nodeManager.OnNodeUpdate(nil, makeNode(tweakTainted("other")))
expectedPayload = ProxyHealth{
CurrentTime: fakeClock.Now(),
LastUpdated: fakeClock.Now(),
@@ -572,7 +588,7 @@ func TestLivezServer(t *testing.T) {
testHTTPHandler(hsTest, http.StatusOK, expectedPayload, t)
// Should return 200 "OK" irrespective of node syncs
hs.SyncNode(makeNode(tweakTainted(ToBeDeletedTaint)))
nodeManager.OnNodeUpdate(nil, makeNode(tweakTainted(ToBeDeletedTaint)))
expectedPayload = ProxyHealth{
CurrentTime: fakeClock.Now(),
LastUpdated: fakeClock.Now(),
@@ -585,7 +601,7 @@ func TestLivezServer(t *testing.T) {
testHTTPHandler(hsTest, http.StatusOK, expectedPayload, t)
// Should return 200 "OK" irrespective of node syncs
hs.SyncNode(makeNode(tweakTainted("other")))
nodeManager.OnNodeUpdate(nil, makeNode(tweakTainted("other")))
expectedPayload = ProxyHealth{
CurrentTime: fakeClock.Now(),
LastUpdated: fakeClock.Now(),
@@ -598,7 +614,7 @@ func TestLivezServer(t *testing.T) {
testHTTPHandler(hsTest, http.StatusOK, expectedPayload, t)
// Should return 200 "OK" irrespective of node syncs
hs.SyncNode(makeNode(tweakDeleted()))
nodeManager.OnNodeUpdate(nil, makeNode(tweakDeleted()))
expectedPayload = ProxyHealth{
CurrentTime: fakeClock.Now(),
LastUpdated: fakeClock.Now(),

View File

@@ -26,6 +26,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/metrics"
"k8s.io/utils/clock"
"k8s.io/utils/ptr"
@@ -70,34 +71,32 @@ type ProxyHealthServer struct {
httpFactory httpServerFactory
clock clock.Clock
nodeManager *proxy.NodeManager
addr string
healthTimeout time.Duration
lock sync.RWMutex
lastUpdatedMap map[v1.IPFamily]time.Time
oldestPendingQueuedMap map[v1.IPFamily]time.Time
nodeEligible bool
}
// NewProxyHealthServer returns a proxy health http server.
func NewProxyHealthServer(addr string, healthTimeout time.Duration) *ProxyHealthServer {
return newProxyHealthServer(stdNetListener{}, stdHTTPServerFactory{}, clock.RealClock{}, addr, healthTimeout)
func NewProxyHealthServer(addr string, healthTimeout time.Duration, nodeManager *proxy.NodeManager) *ProxyHealthServer {
return newProxyHealthServer(stdNetListener{}, stdHTTPServerFactory{}, clock.RealClock{}, addr, healthTimeout, nodeManager)
}
func newProxyHealthServer(listener listener, httpServerFactory httpServerFactory, c clock.Clock, addr string, healthTimeout time.Duration) *ProxyHealthServer {
func newProxyHealthServer(listener listener, httpServerFactory httpServerFactory, c clock.Clock, addr string, healthTimeout time.Duration, nodeManager *proxy.NodeManager) *ProxyHealthServer {
return &ProxyHealthServer{
listener: listener,
httpFactory: httpServerFactory,
clock: c,
addr: addr,
healthTimeout: healthTimeout,
nodeManager: nodeManager,
lastUpdatedMap: make(map[v1.IPFamily]time.Time),
oldestPendingQueuedMap: make(map[v1.IPFamily]time.Time),
// The node is eligible (and thus the proxy healthy) while it's starting up
// and until we've processed the first node event that indicates the
// contrary.
nodeEligible: true,
}
}
@@ -172,30 +171,22 @@ func (hs *ProxyHealthServer) Health() ProxyHealth {
return health
}
// SyncNode syncs the node and determines if it is eligible or not. Eligible is
// defined as being: not tainted by ToBeDeletedTaint and not deleted.
func (hs *ProxyHealthServer) SyncNode(node *v1.Node) {
// NodeEligible returns if node is eligible or not. Eligible is defined
// as being: not tainted by ToBeDeletedTaint and not deleted.
func (hs *ProxyHealthServer) NodeEligible() bool {
hs.lock.Lock()
defer hs.lock.Unlock()
node := hs.nodeManager.Node()
if !node.DeletionTimestamp.IsZero() {
hs.nodeEligible = false
return
return false
}
for _, taint := range node.Spec.Taints {
if taint.Key == ToBeDeletedTaint {
hs.nodeEligible = false
return
return false
}
}
hs.nodeEligible = true
}
// NodeEligible returns nodeEligible field of ProxyHealthServer.
func (hs *ProxyHealthServer) NodeEligible() bool {
hs.lock.RLock()
defer hs.lock.RUnlock()
return hs.nodeEligible
return true
}
// Run starts the healthz HTTP server and blocks until it exits.

View File

@@ -22,6 +22,7 @@ import (
"net"
"os"
"reflect"
"sync"
"time"
v1 "k8s.io/api/core/v1"
@@ -34,39 +35,17 @@ import (
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/proxy/config"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
utilnode "k8s.io/kubernetes/pkg/util/node"
)
// NodeEligibleHandler handles the life cycle of the Node's eligibility, as
// determined by the health server for directing load balancer traffic.
type NodeEligibleHandler struct {
HealthServer *healthcheck.ProxyHealthServer
}
var _ config.NodeHandler = &NodeEligibleHandler{}
// OnNodeAdd is a handler for Node creates.
func (n *NodeEligibleHandler) OnNodeAdd(node *v1.Node) { n.HealthServer.SyncNode(node) }
// OnNodeUpdate is a handler for Node updates.
func (n *NodeEligibleHandler) OnNodeUpdate(_, node *v1.Node) { n.HealthServer.SyncNode(node) }
// OnNodeDelete is a handler for Node deletes.
func (n *NodeEligibleHandler) OnNodeDelete(node *v1.Node) { n.HealthServer.SyncNode(node) }
// OnNodeSynced is a handler for Node syncs.
func (n *NodeEligibleHandler) OnNodeSynced() {}
// NodeManager handles the life cycle of kube-proxy based on the NodeIPs and PodCIDRs handles
// node watch events and crashes kube-proxy if there are any changes in NodeIPs or PodCIDRs.
// Note: It only crashes on change on PodCIDR when watchPodCIDRs is set to true.
type NodeManager struct {
mu sync.Mutex
node *v1.Node
nodeInformer v1informers.NodeInformer
nodeLister corelisters.NodeLister
nodeIPs []net.IP
podCIDRs []string
watchPodCIDRs bool
exitFunc func(exitCode int)
}
@@ -101,8 +80,6 @@ func newNodeManager(ctx context.Context, client clientset.Interface, resyncInter
var node *v1.Node
var err error
var nodeIPs []net.IP
var podCIDRs []string
// wait for the node object to exist and have NodeIPs and PodCIDRs
ctx, cancel := context.WithTimeout(ctx, pollTimeout)
@@ -113,19 +90,15 @@ func newNodeManager(ctx context.Context, client clientset.Interface, resyncInter
return false, nil
}
nodeIPs, err = utilnode.GetNodeHostIPs(node)
_, err = utilnode.GetNodeHostIPs(node)
if err != nil {
return false, nil
}
// we only wait for PodCIDRs if NodeManager is configured with watchPodCIDRs
if watchPodCIDRs && len(podCIDRs) == 0 {
if len(node.Spec.PodCIDRs) > 0 {
podCIDRs = node.Spec.PodCIDRs
} else {
err = fmt.Errorf("node %q does not have any PodCIDR allocated", nodeName)
return false, nil
}
if watchPodCIDRs && len(node.Spec.PodCIDRs) == 0 {
err = fmt.Errorf("node %q does not have any PodCIDR allocated", nodeName)
return false, nil
}
return true, nil
})
@@ -137,8 +110,7 @@ func newNodeManager(ctx context.Context, client clientset.Interface, resyncInter
return &NodeManager{
nodeInformer: nodeInformer,
nodeLister: nodeLister,
nodeIPs: nodeIPs,
podCIDRs: podCIDRs,
node: node,
watchPodCIDRs: watchPodCIDRs,
exitFunc: exitFunc,
}, nil
@@ -146,12 +118,17 @@ func newNodeManager(ctx context.Context, client clientset.Interface, resyncInter
// NodeIPs returns the NodeIPs polled in NewNodeManager().
func (n *NodeManager) NodeIPs() []net.IP {
return n.nodeIPs
n.mu.Lock()
defer n.mu.Unlock()
nodeIPs, _ := utilnode.GetNodeHostIPs(n.node)
return nodeIPs
}
// PodCIDRs returns the PodCIDRs polled in NewNodeManager().
func (n *NodeManager) PodCIDRs() []string {
return n.podCIDRs
n.mu.Lock()
defer n.mu.Unlock()
return n.node.Spec.PodCIDRs
}
// NodeInformer returns the NodeInformer.
@@ -171,13 +148,19 @@ func (n *NodeManager) OnNodeUpdate(_, node *v1.Node) {
// onNodeChange functions helps to implement OnNodeAdd and OnNodeUpdate.
func (n *NodeManager) onNodeChange(node *v1.Node) {
// update the node object
n.mu.Lock()
oldNodeIPs, _ := utilnode.GetNodeHostIPs(n.node)
oldPodCIDRs := n.node.Spec.PodCIDRs
n.node = node
n.mu.Unlock()
// We exit whenever there is a change in PodCIDRs detected initially, and PodCIDRs received
// on node watch event if the node manager is configured with watchPodCIDRs.
if n.watchPodCIDRs {
podCIDRs := node.Spec.PodCIDRs
if !reflect.DeepEqual(n.podCIDRs, podCIDRs) {
if !reflect.DeepEqual(oldPodCIDRs, node.Spec.PodCIDRs) {
klog.InfoS("PodCIDRs changed for the node",
"node", klog.KObj(node), "newPodCIDRs", podCIDRs, "oldPodCIDRs", n.podCIDRs)
"node", klog.KObj(node), "newPodCIDRs", node.Spec.PodCIDRs, "oldPodCIDRs", oldPodCIDRs)
klog.Flush()
n.exitFunc(1)
}
@@ -191,9 +174,9 @@ func (n *NodeManager) onNodeChange(node *v1.Node) {
// We exit whenever there is a change in NodeIPs detected initially, and NodeIPs received
// on node watch event.
if !reflect.DeepEqual(n.nodeIPs, nodeIPs) {
if !reflect.DeepEqual(oldNodeIPs, nodeIPs) {
klog.InfoS("NodeIPs changed for the node",
"node", klog.KObj(node), "newNodeIPs", nodeIPs, "oldNodeIPs", n.nodeIPs)
"node", klog.KObj(node), "newNodeIPs", nodeIPs, "oldNodeIPs", oldNodeIPs)
klog.Flush()
n.exitFunc(1)
}
@@ -208,3 +191,10 @@ func (n *NodeManager) OnNodeDelete(node *v1.Node) {
// OnNodeSynced is called after the cache is synced and all pre-existing Nodes have been reported
func (n *NodeManager) OnNodeSynced() {}
// Node returns the deep copy of the latest node object.
func (n *NodeManager) Node() *v1.Node {
n.mu.Lock()
defer n.mu.Unlock()
return n.node.DeepCopy()
}

View File

@@ -65,6 +65,12 @@ func tweakPodCIDRs(podCIDRs ...string) nodeTweak {
}
}
func tweakResourceVersion(resourceVersion string) nodeTweak {
return func(n *v1.Node) {
n.ResourceVersion = resourceVersion
}
}
func TestNewNodeManager(t *testing.T) {
testCases := []struct {
name string
@@ -301,3 +307,21 @@ func TestNodeManagerOnNodeDelete(t *testing.T) {
nodeManager.OnNodeDelete(makeNode())
require.Equal(t, ptr.To(1), exitCode)
}
func TestNodeManagerNode(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
client := clientsetfake.NewClientset()
_, _ = client.CoreV1().Nodes().Create(ctx, makeNode(
tweakNodeIPs("192.168.1.1"),
tweakResourceVersion("1")),
metav1.CreateOptions{})
nodeManager, err := newNodeManager(ctx, client, 30*time.Second, testNodeName, false, func(i int) {}, time.Nanosecond, time.Nanosecond)
require.NoError(t, err)
require.Equal(t, "1", nodeManager.Node().ResourceVersion)
nodeManager.OnNodeUpdate(nil, makeNode(tweakResourceVersion("2")))
require.NoError(t, err)
require.Equal(t, "2", nodeManager.Node().ResourceVersion)
}