diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index b69033fe2c7..761e6d5be02 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -66,7 +66,6 @@ import ( "k8s.io/kube-proxy/config/v1alpha1" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/cluster/ports" - "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/qos" "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy/apis" @@ -544,6 +543,7 @@ type ProxyServer struct { OOMScoreAdj *int32 ConfigSyncPeriod time.Duration HealthzServer healthcheck.ProxierHealthUpdater + localDetectorMode kubeproxyconfig.LocalMode } // createClients creates a kube client and an event client from the given config and masterOverride. @@ -758,20 +758,23 @@ func (s *ProxyServer) Run() error { // function must configure its shared informer event handlers first. informerFactory.Start(wait.NeverStop) - if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) { - // Make an informer that selects for our nodename. - currentNodeInformerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod, - informers.WithTweakListOptions(func(options *metav1.ListOptions) { - options.FieldSelector = fields.OneTermEqualSelector("metadata.name", s.NodeRef.Name).String() - })) - nodeConfig := config.NewNodeConfig(currentNodeInformerFactory.Core().V1().Nodes(), s.ConfigSyncPeriod) - nodeConfig.RegisterEventHandler(s.Proxier) - go nodeConfig.Run(wait.NeverStop) - - // This has to start after the calls to NewNodeConfig because that must - // configure the shared informer event handler first. - currentNodeInformerFactory.Start(wait.NeverStop) + // Make an informer that selects for our nodename. + currentNodeInformerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod, + informers.WithTweakListOptions(func(options *metav1.ListOptions) { + options.FieldSelector = fields.OneTermEqualSelector("metadata.name", s.NodeRef.Name).String() + })) + nodeConfig := config.NewNodeConfig(currentNodeInformerFactory.Core().V1().Nodes(), s.ConfigSyncPeriod) + // https://issues.k8s.io/111321 + if s.localDetectorMode == kubeproxyconfig.LocalModeNodeCIDR { + nodeConfig.RegisterEventHandler(&proxy.NodePodCIDRHandler{}) } + nodeConfig.RegisterEventHandler(s.Proxier) + + go nodeConfig.Run(wait.NeverStop) + + // This has to start after the calls to NewNodeConfig because that must + // configure the shared informer event handler first. + currentNodeInformerFactory.Start(wait.NeverStop) // Birth Cry after the birth is successful s.birthCry() diff --git a/cmd/kube-proxy/app/server_others.go b/cmd/kube-proxy/app/server_others.go index e938cdfab7b..460dd2327eb 100644 --- a/cmd/kube-proxy/app/server_others.go +++ b/cmd/kube-proxy/app/server_others.go @@ -338,6 +338,7 @@ func newProxyServer( OOMScoreAdj: config.OOMScoreAdj, ConfigSyncPeriod: config.ConfigSyncPeriod.Duration, HealthzServer: healthzServer, + localDetectorMode: detectLocalMode, }, nil } @@ -359,10 +360,20 @@ func waitForPodCIDR(client clientset.Interface, nodeName string) (*v1.Node, erro }, } condition := func(event watch.Event) (bool, error) { - if n, ok := event.Object.(*v1.Node); ok { - return n.Spec.PodCIDR != "" && len(n.Spec.PodCIDRs) > 0, nil + // don't process delete events + if event.Type != watch.Modified && event.Type != watch.Added { + return false, nil } - return false, fmt.Errorf("event object not of type Node") + + n, ok := event.Object.(*v1.Node) + if !ok { + return false, fmt.Errorf("event object not of type Node") + } + // don't consider the node if is going to be deleted and keep waiting + if !n.DeletionTimestamp.IsZero() { + return false, nil + } + return n.Spec.PodCIDR != "" && len(n.Spec.PodCIDRs) > 0, nil } evt, err := toolswatch.UntilWithSync(ctx, lw, &v1.Node{}, nil, condition) diff --git a/cmd/kube-proxy/app/server_others_test.go b/cmd/kube-proxy/app/server_others_test.go index 7fa574a9509..a1f3f691180 100644 --- a/cmd/kube-proxy/app/server_others_test.go +++ b/cmd/kube-proxy/app/server_others_test.go @@ -31,9 +31,12 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" netutils "k8s.io/utils/net" clientsetfake "k8s.io/client-go/kubernetes/fake" + clientgotesting "k8s.io/client-go/testing" proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config" proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables" @@ -743,3 +746,52 @@ detectLocalMode: "BridgeInterface"`) tearDown(file, tempDir) } } + +func Test_waitForPodCIDR(t *testing.T) { + expected := []string{"192.168.0.0/24", "fd00:1:2::/64"} + nodeName := "test-node" + oldNode := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: nodeName, + ResourceVersion: "1000", + }, + Spec: v1.NodeSpec{ + PodCIDR: "10.0.0.0/24", + PodCIDRs: []string{"10.0.0.0/24", "2001:db2:1/64"}, + }, + } + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: nodeName, + ResourceVersion: "1", + }, + } + updatedNode := node.DeepCopy() + updatedNode.Spec.PodCIDRs = expected + updatedNode.Spec.PodCIDR = expected[0] + + // start with the new node + client := clientsetfake.NewSimpleClientset() + client.AddReactor("list", "nodes", func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) { + obj := &v1.NodeList{} + return true, obj, nil + }) + fakeWatch := watch.NewFake() + client.PrependWatchReactor("nodes", clientgotesting.DefaultWatchReactor(fakeWatch, nil)) + + go func() { + fakeWatch.Add(node) + // receive a delete event for the old node + fakeWatch.Delete(oldNode) + // set the PodCIDRs on the new node + fakeWatch.Modify(updatedNode) + }() + got, err := waitForPodCIDR(client, node.Name) + if err != nil { + t.Errorf("waitForPodCIDR() unexpected error %v", err) + return + } + if !reflect.DeepEqual(got.Spec.PodCIDRs, expected) { + t.Errorf("waitForPodCIDR() got %v expected to be %v ", got.Spec.PodCIDRs, expected) + } +} diff --git a/pkg/proxy/node.go b/pkg/proxy/node.go new file mode 100644 index 00000000000..007699a3f7b --- /dev/null +++ b/pkg/proxy/node.go @@ -0,0 +1,81 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package proxy + +import ( + "reflect" + "sync" + + v1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/proxy/config" +) + +// NodePodCIDRHandler handles the life cycle of kube-proxy based on the node PodCIDR assigned +// Implements the config.NodeHandler interface +// https://issues.k8s.io/111321 +type NodePodCIDRHandler struct { + mu sync.Mutex + podCIDRs []string +} + +var _ config.NodeHandler = &NodePodCIDRHandler{} + +// OnNodeAdd is a handler for Node creates. +func (n *NodePodCIDRHandler) OnNodeAdd(node *v1.Node) { + n.mu.Lock() + defer n.mu.Unlock() + + podCIDRs := node.Spec.PodCIDRs + // initialize podCIDRs + if len(n.podCIDRs) == 0 && len(podCIDRs) > 0 { + klog.InfoS("Setting current PodCIDRs", "PodCIDRs", podCIDRs) + n.podCIDRs = podCIDRs + return + } + if !reflect.DeepEqual(n.podCIDRs, podCIDRs) { + klog.ErrorS(nil, "Using NodeCIDR LocalDetector mode, current PodCIDRs are different than previous PodCIDRs, restarting", + "node", klog.KObj(node), "New Node PodCIDRs", podCIDRs, "Old Node UID", n.podCIDRs) + panic("Current Node PodCIDRs are different than previous PodCIDRs, restarting") + } +} + +// OnNodeUpdate is a handler for Node updates. +func (n *NodePodCIDRHandler) OnNodeUpdate(_, node *v1.Node) { + n.mu.Lock() + defer n.mu.Unlock() + podCIDRs := node.Spec.PodCIDRs + // initialize podCIDRs + if len(n.podCIDRs) == 0 && len(podCIDRs) > 0 { + klog.InfoS("Setting current PodCIDRs", "PodCIDRs", podCIDRs) + n.podCIDRs = podCIDRs + return + } + if !reflect.DeepEqual(n.podCIDRs, podCIDRs) { + klog.ErrorS(nil, "Using NodeCIDR LocalDetector mode, current PodCIDRs are different than previous PodCIDRs, restarting", + "node", klog.KObj(node), "New Node PodCIDRs", podCIDRs, "Old Node UID", n.podCIDRs) + panic("Current Node PodCIDRs are different than previous PodCIDRs, restarting") + } +} + +// OnNodeDelete is a handler for Node deletes. +func (n *NodePodCIDRHandler) OnNodeDelete(node *v1.Node) { + klog.ErrorS(nil, "Current Node is being deleted", "node", klog.KObj(node)) +} + +// OnNodeSynced is a handler for Node syncs. +func (n *NodePodCIDRHandler) OnNodeSynced() {} diff --git a/pkg/proxy/node_test.go b/pkg/proxy/node_test.go new file mode 100644 index 00000000000..ab20130b033 --- /dev/null +++ b/pkg/proxy/node_test.go @@ -0,0 +1,126 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package proxy + +import ( + "testing" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestNodePodCIDRHandlerAdd(t *testing.T) { + tests := []struct { + name string + oldNodePodCIDRs []string + newNodePodCIDRs []string + expectPanic bool + }{ + { + name: "both empty", + }, + { + name: "initialized correctly", + newNodePodCIDRs: []string{"192.168.1.0/24", "fd00:1:2:3::/64"}, + }, + { + name: "already initialized and different node", + oldNodePodCIDRs: []string{"192.168.1.0/24", "fd00:1:2:3::/64"}, + newNodePodCIDRs: []string{"10.0.0.0/24", "fd00:3:2:1::/64"}, + expectPanic: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + n := &NodePodCIDRHandler{ + podCIDRs: tt.oldNodePodCIDRs, + } + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + ResourceVersion: "1", + }, + Spec: v1.NodeSpec{ + PodCIDRs: tt.newNodePodCIDRs, + }, + } + defer func() { + r := recover() + if r == nil && tt.expectPanic { + t.Errorf("The code did not panic") + } else if r != nil && !tt.expectPanic { + t.Errorf("The code did panic") + } + }() + n.OnNodeAdd(node) + }) + } +} + +func TestNodePodCIDRHandlerUpdate(t *testing.T) { + tests := []struct { + name string + oldNodePodCIDRs []string + newNodePodCIDRs []string + expectPanic bool + }{ + { + name: "both empty", + }, + { + name: "initialize", + newNodePodCIDRs: []string{"192.168.1.0/24", "fd00:1:2:3::/64"}, + }, + { + name: "same node", + oldNodePodCIDRs: []string{"192.168.1.0/24", "fd00:1:2:3::/64"}, + newNodePodCIDRs: []string{"192.168.1.0/24", "fd00:1:2:3::/64"}, + }, + { + name: "different nodes", + oldNodePodCIDRs: []string{"192.168.1.0/24", "fd00:1:2:3::/64"}, + newNodePodCIDRs: []string{"10.0.0.0/24", "fd00:3:2:1::/64"}, + expectPanic: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + n := &NodePodCIDRHandler{ + podCIDRs: tt.oldNodePodCIDRs, + } + oldNode := &v1.Node{} + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + ResourceVersion: "1", + }, + Spec: v1.NodeSpec{ + PodCIDRs: tt.newNodePodCIDRs, + }, + } + defer func() { + r := recover() + if r == nil && tt.expectPanic { + t.Errorf("The code did not panic") + } else if r != nil && !tt.expectPanic { + t.Errorf("The code did panic") + } + }() + n.OnNodeUpdate(oldNode, node) + }) + } +} diff --git a/pkg/proxy/topology.go b/pkg/proxy/topology.go index d68aeb40f94..b58f67af335 100644 --- a/pkg/proxy/topology.go +++ b/pkg/proxy/topology.go @@ -145,6 +145,9 @@ func CategorizeEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeLabels m // * All of the endpoints for this Service have a topology hint // * At least one endpoint for this Service is hinted for this node's zone. func canUseTopology(endpoints []Endpoint, svcInfo ServicePort, nodeLabels map[string]string) bool { + if !utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) { + return false + } hintsAnnotation := svcInfo.HintsAnnotation() if hintsAnnotation != "Auto" && hintsAnnotation != "auto" { if hintsAnnotation != "" && hintsAnnotation != "Disabled" && hintsAnnotation != "disabled" { diff --git a/pkg/proxy/topology_test.go b/pkg/proxy/topology_test.go index 77087a66bbf..362af7f2d0e 100644 --- a/pkg/proxy/topology_test.go +++ b/pkg/proxy/topology_test.go @@ -91,6 +91,20 @@ func TestCategorizeEndpoints(t *testing.T) { clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"), localEndpoints: nil, }, { + name: "hints disabled, hints annotation == auto", + hintsEnabled: false, + nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, + serviceInfo: &BaseServiceInfo{hintsAnnotation: "auto"}, + endpoints: []Endpoint{ + &BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, + &BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true}, + &BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true}, + &BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, + }, + clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"), + localEndpoints: nil, + }, { + name: "hints, hints annotation == aUto (wrong capitalization), hints ignored", hintsEnabled: true, nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},