diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index b69033fe2c7..761e6d5be02 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -66,7 +66,6 @@ import ( "k8s.io/kube-proxy/config/v1alpha1" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/cluster/ports" - "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/qos" "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy/apis" @@ -544,6 +543,7 @@ type ProxyServer struct { OOMScoreAdj *int32 ConfigSyncPeriod time.Duration HealthzServer healthcheck.ProxierHealthUpdater + localDetectorMode kubeproxyconfig.LocalMode } // createClients creates a kube client and an event client from the given config and masterOverride. @@ -758,20 +758,23 @@ func (s *ProxyServer) Run() error { // function must configure its shared informer event handlers first. informerFactory.Start(wait.NeverStop) - if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) { - // Make an informer that selects for our nodename. - currentNodeInformerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod, - informers.WithTweakListOptions(func(options *metav1.ListOptions) { - options.FieldSelector = fields.OneTermEqualSelector("metadata.name", s.NodeRef.Name).String() - })) - nodeConfig := config.NewNodeConfig(currentNodeInformerFactory.Core().V1().Nodes(), s.ConfigSyncPeriod) - nodeConfig.RegisterEventHandler(s.Proxier) - go nodeConfig.Run(wait.NeverStop) - - // This has to start after the calls to NewNodeConfig because that must - // configure the shared informer event handler first. - currentNodeInformerFactory.Start(wait.NeverStop) + // Make an informer that selects for our nodename. + currentNodeInformerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod, + informers.WithTweakListOptions(func(options *metav1.ListOptions) { + options.FieldSelector = fields.OneTermEqualSelector("metadata.name", s.NodeRef.Name).String() + })) + nodeConfig := config.NewNodeConfig(currentNodeInformerFactory.Core().V1().Nodes(), s.ConfigSyncPeriod) + // https://issues.k8s.io/111321 + if s.localDetectorMode == kubeproxyconfig.LocalModeNodeCIDR { + nodeConfig.RegisterEventHandler(&proxy.NodePodCIDRHandler{}) } + nodeConfig.RegisterEventHandler(s.Proxier) + + go nodeConfig.Run(wait.NeverStop) + + // This has to start after the calls to NewNodeConfig because that must + // configure the shared informer event handler first. + currentNodeInformerFactory.Start(wait.NeverStop) // Birth Cry after the birth is successful s.birthCry() diff --git a/cmd/kube-proxy/app/server_others.go b/cmd/kube-proxy/app/server_others.go index e938cdfab7b..647d3211ff3 100644 --- a/cmd/kube-proxy/app/server_others.go +++ b/cmd/kube-proxy/app/server_others.go @@ -338,6 +338,7 @@ func newProxyServer( OOMScoreAdj: config.OOMScoreAdj, ConfigSyncPeriod: config.ConfigSyncPeriod.Duration, HealthzServer: healthzServer, + localDetectorMode: detectLocalMode, }, nil } diff --git a/pkg/proxy/node.go b/pkg/proxy/node.go new file mode 100644 index 00000000000..007699a3f7b --- /dev/null +++ b/pkg/proxy/node.go @@ -0,0 +1,81 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package proxy + +import ( + "reflect" + "sync" + + v1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/proxy/config" +) + +// NodePodCIDRHandler handles the life cycle of kube-proxy based on the node PodCIDR assigned +// Implements the config.NodeHandler interface +// https://issues.k8s.io/111321 +type NodePodCIDRHandler struct { + mu sync.Mutex + podCIDRs []string +} + +var _ config.NodeHandler = &NodePodCIDRHandler{} + +// OnNodeAdd is a handler for Node creates. +func (n *NodePodCIDRHandler) OnNodeAdd(node *v1.Node) { + n.mu.Lock() + defer n.mu.Unlock() + + podCIDRs := node.Spec.PodCIDRs + // initialize podCIDRs + if len(n.podCIDRs) == 0 && len(podCIDRs) > 0 { + klog.InfoS("Setting current PodCIDRs", "PodCIDRs", podCIDRs) + n.podCIDRs = podCIDRs + return + } + if !reflect.DeepEqual(n.podCIDRs, podCIDRs) { + klog.ErrorS(nil, "Using NodeCIDR LocalDetector mode, current PodCIDRs are different than previous PodCIDRs, restarting", + "node", klog.KObj(node), "New Node PodCIDRs", podCIDRs, "Old Node UID", n.podCIDRs) + panic("Current Node PodCIDRs are different than previous PodCIDRs, restarting") + } +} + +// OnNodeUpdate is a handler for Node updates. +func (n *NodePodCIDRHandler) OnNodeUpdate(_, node *v1.Node) { + n.mu.Lock() + defer n.mu.Unlock() + podCIDRs := node.Spec.PodCIDRs + // initialize podCIDRs + if len(n.podCIDRs) == 0 && len(podCIDRs) > 0 { + klog.InfoS("Setting current PodCIDRs", "PodCIDRs", podCIDRs) + n.podCIDRs = podCIDRs + return + } + if !reflect.DeepEqual(n.podCIDRs, podCIDRs) { + klog.ErrorS(nil, "Using NodeCIDR LocalDetector mode, current PodCIDRs are different than previous PodCIDRs, restarting", + "node", klog.KObj(node), "New Node PodCIDRs", podCIDRs, "Old Node UID", n.podCIDRs) + panic("Current Node PodCIDRs are different than previous PodCIDRs, restarting") + } +} + +// OnNodeDelete is a handler for Node deletes. +func (n *NodePodCIDRHandler) OnNodeDelete(node *v1.Node) { + klog.ErrorS(nil, "Current Node is being deleted", "node", klog.KObj(node)) +} + +// OnNodeSynced is a handler for Node syncs. +func (n *NodePodCIDRHandler) OnNodeSynced() {} diff --git a/pkg/proxy/node_test.go b/pkg/proxy/node_test.go new file mode 100644 index 00000000000..ab20130b033 --- /dev/null +++ b/pkg/proxy/node_test.go @@ -0,0 +1,126 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package proxy + +import ( + "testing" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestNodePodCIDRHandlerAdd(t *testing.T) { + tests := []struct { + name string + oldNodePodCIDRs []string + newNodePodCIDRs []string + expectPanic bool + }{ + { + name: "both empty", + }, + { + name: "initialized correctly", + newNodePodCIDRs: []string{"192.168.1.0/24", "fd00:1:2:3::/64"}, + }, + { + name: "already initialized and different node", + oldNodePodCIDRs: []string{"192.168.1.0/24", "fd00:1:2:3::/64"}, + newNodePodCIDRs: []string{"10.0.0.0/24", "fd00:3:2:1::/64"}, + expectPanic: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + n := &NodePodCIDRHandler{ + podCIDRs: tt.oldNodePodCIDRs, + } + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + ResourceVersion: "1", + }, + Spec: v1.NodeSpec{ + PodCIDRs: tt.newNodePodCIDRs, + }, + } + defer func() { + r := recover() + if r == nil && tt.expectPanic { + t.Errorf("The code did not panic") + } else if r != nil && !tt.expectPanic { + t.Errorf("The code did panic") + } + }() + n.OnNodeAdd(node) + }) + } +} + +func TestNodePodCIDRHandlerUpdate(t *testing.T) { + tests := []struct { + name string + oldNodePodCIDRs []string + newNodePodCIDRs []string + expectPanic bool + }{ + { + name: "both empty", + }, + { + name: "initialize", + newNodePodCIDRs: []string{"192.168.1.0/24", "fd00:1:2:3::/64"}, + }, + { + name: "same node", + oldNodePodCIDRs: []string{"192.168.1.0/24", "fd00:1:2:3::/64"}, + newNodePodCIDRs: []string{"192.168.1.0/24", "fd00:1:2:3::/64"}, + }, + { + name: "different nodes", + oldNodePodCIDRs: []string{"192.168.1.0/24", "fd00:1:2:3::/64"}, + newNodePodCIDRs: []string{"10.0.0.0/24", "fd00:3:2:1::/64"}, + expectPanic: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + n := &NodePodCIDRHandler{ + podCIDRs: tt.oldNodePodCIDRs, + } + oldNode := &v1.Node{} + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + ResourceVersion: "1", + }, + Spec: v1.NodeSpec{ + PodCIDRs: tt.newNodePodCIDRs, + }, + } + defer func() { + r := recover() + if r == nil && tt.expectPanic { + t.Errorf("The code did not panic") + } else if r != nil && !tt.expectPanic { + t.Errorf("The code did panic") + } + }() + n.OnNodeUpdate(oldNode, node) + }) + } +}