From 4cd1eacbc1f90e60061818d1b4713f78e78de0f9 Mon Sep 17 00:00:00 2001 From: Hanlin Shi Date: Thu, 7 Jan 2021 23:50:00 +0000 Subject: [PATCH] Add rule to allow healthcheck nodeport traffic in filter table 1. For iptables mode, add KUBE-NODEPORTS chain in filter table. Add rules to allow healthcheck node port traffic. 2. For ipvs mode, add KUBE-NODE-PORT chain in filter table. Add KUBE-HEALTH-CHECK-NODE-PORT ipset to allow traffic to healthcheck node port. --- pkg/proxy/iptables/proxier.go | 18 +++- pkg/proxy/iptables/proxier_test.go | 164 ++++++++++++++++++++++++++++- pkg/proxy/ipvs/ipset.go | 3 + pkg/proxy/ipvs/proxier.go | 27 +++++ pkg/proxy/ipvs/proxier_test.go | 125 ++++++++++++++++++++++ pkg/util/iptables/testing/fake.go | 2 + 6 files changed, 336 insertions(+), 3 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index a4cbb1fadff..d888cc95fda 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -395,6 +395,7 @@ type iptablesJumpChain struct { var iptablesJumpChains = []iptablesJumpChain{ {utiliptables.TableFilter, kubeExternalServicesChain, utiliptables.ChainInput, "kubernetes externally-visible service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}}, {utiliptables.TableFilter, kubeExternalServicesChain, utiliptables.ChainForward, "kubernetes externally-visible service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}}, + {utiliptables.TableFilter, kubeNodePortsChain, utiliptables.ChainInput, "kubernetes health check service ports", nil}, {utiliptables.TableFilter, kubeServicesChain, utiliptables.ChainForward, "kubernetes service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}}, {utiliptables.TableFilter, kubeServicesChain, utiliptables.ChainOutput, "kubernetes service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}}, {utiliptables.TableFilter, kubeForwardChain, utiliptables.ChainForward, "kubernetes forwarding rules", nil}, @@ -479,7 +480,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) { filterChains := bytes.NewBuffer(nil) filterRules := bytes.NewBuffer(nil) utilproxy.WriteLine(filterChains, "*filter") - for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain} { + for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain, kubeNodePortsChain} { if _, found := existingFilterChains[chain]; found { chainString := string(chain) utilproxy.WriteBytesLine(filterChains, existingFilterChains[chain]) @@ -933,7 +934,7 @@ func (proxier *Proxier) syncProxyRules() { // Make sure we keep stats for the top-level chains, if they existed // (which most should have because we created them above). - for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain} { + for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain, kubeNodePortsChain} { if chain, ok := existingFilterChains[chainName]; ok { utilproxy.WriteBytesLine(proxier.filterChains, chain) } else { @@ -1337,6 +1338,19 @@ func (proxier *Proxier) syncProxyRules() { } } + // Capture healthCheckNodePorts. + if svcInfo.HealthCheckNodePort() != 0 { + // no matter if node has local endpoints, healthCheckNodePorts + // need to add a rule to accept the incoming connection + utilproxy.WriteLine(proxier.filterRules, + "-A", string(kubeNodePortsChain), + "-m", "comment", "--comment", fmt.Sprintf(`"%s health check node port"`, svcNameString), + "-m", "tcp", "-p", "tcp", + "--dport", strconv.Itoa(svcInfo.HealthCheckNodePort()), + "-j", "ACCEPT", + ) + } + if !hasEndpoints { continue } diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index e3dd8d5c4bd..a5097cfbaf9 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -454,6 +454,14 @@ func TestDeleteEndpointConnectionsIPv6(t *testing.T) { } } +// fakeCloseable implements utilproxy.Closeable +type fakeCloseable struct{} + +// Close fakes out the close() used by syncProxyRules to release a local port. +func (f *fakeCloseable) Close() error { + return nil +} + // fakePortOpener implements portOpener. type fakePortOpener struct { openPorts []*utilproxy.LocalPort @@ -463,7 +471,7 @@ type fakePortOpener struct { // to lock a local port. func (f *fakePortOpener) OpenLocalPort(lp *utilproxy.LocalPort, isIPv6 bool) (utilproxy.Closeable, error) { f.openPorts = append(f.openPorts, lp) - return nil, nil + return &fakeCloseable{}, nil } const testHostname = "test-hostname" @@ -913,6 +921,50 @@ func TestNodePort(t *testing.T) { } } +func TestHealthCheckNodePort(t *testing.T) { + ipt := iptablestest.NewFake() + fp := NewFakeProxier(ipt, false) + svcIP := "10.20.30.42" + svcPort := 80 + svcNodePort := 3001 + svcHealthCheckNodePort := 30000 + svcPortName := proxy.ServicePortName{ + NamespacedName: makeNSN("ns1", "svc1"), + Port: "p80", + Protocol: v1.ProtocolTCP, + } + + makeServiceMap(fp, + makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) { + svc.Spec.Type = "LoadBalancer" + svc.Spec.ClusterIP = svcIP + svc.Spec.Ports = []v1.ServicePort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: v1.ProtocolTCP, + NodePort: int32(svcNodePort), + }} + svc.Spec.HealthCheckNodePort = int32(svcHealthCheckNodePort) + svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal + }), + ) + + itf := net.Interface{Index: 0, MTU: 0, Name: "lo", HardwareAddr: nil, Flags: 0} + addrs := []net.Addr{utilproxytest.AddrStruct{Val: "127.0.0.1/16"}} + itf1 := net.Interface{Index: 1, MTU: 0, Name: "eth1", HardwareAddr: nil, Flags: 0} + addrs1 := []net.Addr{utilproxytest.AddrStruct{Val: "::1/128"}} + fp.networkInterfacer.(*utilproxytest.FakeNetwork).AddInterfaceAddr(&itf, addrs) + fp.networkInterfacer.(*utilproxytest.FakeNetwork).AddInterfaceAddr(&itf1, addrs1) + fp.nodePortAddresses = []string{"127.0.0.1/16"} + + fp.syncProxyRules() + + kubeNodePortsRules := ipt.GetRules(string(kubeNodePortsChain)) + if !hasJump(kubeNodePortsRules, iptablestest.Accept, "", svcHealthCheckNodePort) { + errorf(fmt.Sprintf("Failed to find Accept rule"), kubeNodePortsRules, t) + } +} + func TestMasqueradeRule(t *testing.T) { for _, testcase := range []bool{false, true} { ipt := iptablestest.NewFake().SetHasRandomFully(testcase) @@ -2594,6 +2646,7 @@ func TestEndpointSliceE2E(t *testing.T) { :KUBE-SERVICES - [0:0] :KUBE-EXTERNAL-SERVICES - [0:0] :KUBE-FORWARD - [0:0] +:KUBE-NODEPORTS - [0:0] -A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP -A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT -A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod source rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT @@ -2686,4 +2739,113 @@ COMMIT assert.NotEqual(t, expectedIPTablesWithSlice, fp.iptablesData.String()) } +func TestHealthCheckNodePortE2E(t *testing.T) { + expectedIPTables := `*filter +:KUBE-SERVICES - [0:0] +:KUBE-EXTERNAL-SERVICES - [0:0] +:KUBE-FORWARD - [0:0] +:KUBE-NODEPORTS - [0:0] +-A KUBE-NODEPORTS -m comment --comment "ns1/svc1 health check node port" -m tcp -p tcp --dport 30000 -j ACCEPT +-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP +-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT +-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod source rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT +-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod destination rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT +COMMIT +*nat +:KUBE-SERVICES - [0:0] +:KUBE-NODEPORTS - [0:0] +:KUBE-POSTROUTING - [0:0] +:KUBE-MARK-MASQ - [0:0] +:KUBE-SVC-AQI2S6QIMU7PVVRP - [0:0] +:KUBE-XLB-AQI2S6QIMU7PVVRP - [0:0] +:KUBE-SEP-3JOIVZTXZZRGORX4 - [0:0] +:KUBE-SEP-IO5XOSKPAXIFQXAJ - [0:0] +:KUBE-SEP-XGJFVO3L2O5SRFNT - [0:0] +-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN +-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000 +-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE +-A KUBE-MARK-MASQ -j MARK --or-mark 0x4000 +-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 0 ! -s 10.0.0.0/24 -j KUBE-MARK-MASQ +-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 0 -j KUBE-SVC-AQI2S6QIMU7PVVRP +-A KUBE-NODEPORTS -m comment --comment ns1/svc1 -m tcp -p tcp --dport 30010 -s 127.0.0.0/8 -j KUBE-MARK-MASQ +-A KUBE-NODEPORTS -m comment --comment ns1/svc1 -m tcp -p tcp --dport 30010 -j KUBE-XLB-AQI2S6QIMU7PVVRP +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.3333333333 -j KUBE-SEP-3JOIVZTXZZRGORX4 +-A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -s 10.0.1.1/32 -j KUBE-MARK-MASQ +-A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80 +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-IO5XOSKPAXIFQXAJ +-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -s 10.0.1.2/32 -j KUBE-MARK-MASQ +-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.2:80 +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-XGJFVO3L2O5SRFNT +-A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -s 10.0.1.3/32 -j KUBE-MARK-MASQ +-A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.3:80 +-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Redirect pods trying to reach external loadbalancer VIP to clusterIP" -s 10.0.0.0/24 -j KUBE-SVC-AQI2S6QIMU7PVVRP +-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "masquerade LOCAL traffic for ns1/svc1 LB IP" -m addrtype --src-type LOCAL -j KUBE-MARK-MASQ +-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "route LOCAL traffic for ns1/svc1 LB IP to service chain" -m addrtype --src-type LOCAL -j KUBE-SVC-AQI2S6QIMU7PVVRP +-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Balancing rule 0 for ns1/svc1" -j KUBE-SEP-3JOIVZTXZZRGORX4 +-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS +COMMIT +` + + ipt := iptablestest.NewFake() + fp := NewFakeProxier(ipt, true) + fp.OnServiceSynced() + fp.OnEndpointsSynced() + fp.OnEndpointSlicesSynced() + + serviceName := "svc1" + namespaceName := "ns1" + + svc := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName}, + Spec: v1.ServiceSpec{ + ClusterIP: "172.20.1.1", + Selector: map[string]string{"foo": "bar"}, + Ports: []v1.ServicePort{{Name: "", TargetPort: intstr.FromInt(80), NodePort: 30010, Protocol: v1.ProtocolTCP}}, + Type: "LoadBalancer", + HealthCheckNodePort: 30000, + ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeLocal, + }, + } + fp.OnServiceAdd(svc) + + tcpProtocol := v1.ProtocolTCP + endpointSlice := &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-1", serviceName), + Namespace: namespaceName, + Labels: map[string]string{discovery.LabelServiceName: serviceName}, + }, + Ports: []discovery.EndpointPort{{ + Name: utilpointer.StringPtr(""), + Port: utilpointer.Int32Ptr(80), + Protocol: &tcpProtocol, + }}, + AddressType: discovery.AddressTypeIPv4, + Endpoints: []discovery.Endpoint{{ + Addresses: []string{"10.0.1.1"}, + Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)}, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, { + Addresses: []string{"10.0.1.2"}, + Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)}, + Topology: map[string]string{"kubernetes.io/hostname": "node2"}, + }, { + Addresses: []string{"10.0.1.3"}, + Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)}, + Topology: map[string]string{"kubernetes.io/hostname": "node3"}, + }, { // not ready endpoints should be ignored + Addresses: []string{"10.0.1.4"}, + Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(false)}, + Topology: map[string]string{"kubernetes.io/hostname": "node4"}, + }}, + } + fp.OnEndpointSliceAdd(endpointSlice) + fp.syncProxyRules() + assert.Equal(t, expectedIPTables, fp.iptablesData.String()) + + fp.OnServiceDelete(svc) + fp.syncProxyRules() + assert.NotEqual(t, expectedIPTables, fp.iptablesData.String()) +} + // TODO(thockin): add *more* tests for syncProxyRules() or break it down further and test the pieces. diff --git a/pkg/proxy/ipvs/ipset.go b/pkg/proxy/ipvs/ipset.go index 4ed9790cc7c..02eb60d8074 100644 --- a/pkg/proxy/ipvs/ipset.go +++ b/pkg/proxy/ipvs/ipset.go @@ -75,6 +75,9 @@ const ( kubeNodePortLocalSetSCTPComment = "Kubernetes nodeport SCTP port with externalTrafficPolicy=local with type 'hash ip:port'" kubeNodePortLocalSetSCTP = "KUBE-NODE-PORT-LOCAL-SCTP-HASH" + + kubeHealthCheckNodePortSetComment = "Kubernetes health check node port" + kubeHealthCheckNodePortSet = "KUBE-HEALTH-CHECK-NODE-PORT" ) // IPSetVersioner can query the current ipset version. diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 86c149964ad..1dfafe7d6e0 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -106,6 +106,7 @@ var iptablesJumpChain = []struct { {utiliptables.TableNAT, utiliptables.ChainPrerouting, kubeServicesChain, "kubernetes service portals"}, {utiliptables.TableNAT, utiliptables.ChainPostrouting, kubePostroutingChain, "kubernetes postrouting rules"}, {utiliptables.TableFilter, utiliptables.ChainForward, KubeForwardChain, "kubernetes forwarding rules"}, + {utiliptables.TableFilter, utiliptables.ChainInput, KubeNodePortChain, "kubernetes health check rules"}, } var iptablesChains = []struct { @@ -119,6 +120,7 @@ var iptablesChains = []struct { {utiliptables.TableNAT, KubeLoadBalancerChain}, {utiliptables.TableNAT, KubeMarkMasqChain}, {utiliptables.TableFilter, KubeForwardChain}, + {utiliptables.TableFilter, KubeNodePortChain}, } var iptablesEnsureChains = []struct { @@ -161,6 +163,7 @@ var ipsetInfo = []struct { {kubeNodePortLocalSetUDP, utilipset.BitmapPort, kubeNodePortLocalSetUDPComment}, {kubeNodePortSetSCTP, utilipset.HashIPPort, kubeNodePortSetSCTPComment}, {kubeNodePortLocalSetSCTP, utilipset.HashIPPort, kubeNodePortLocalSetSCTPComment}, + {kubeHealthCheckNodePortSet, utilipset.BitmapPort, kubeHealthCheckNodePortSetComment}, } // ipsetWithIptablesChain is the ipsets list with iptables source chain and the chain jump to @@ -1581,6 +1584,22 @@ func (proxier *Proxier) syncProxyRules() { } } } + + if svcInfo.HealthCheckNodePort() != 0 { + nodePortSet := proxier.ipsetList[kubeHealthCheckNodePortSet] + entry := &utilipset.Entry{ + // No need to provide ip info + Port: svcInfo.HealthCheckNodePort(), + Protocol: "tcp", + SetType: utilipset.BitmapPort, + } + + if valid := nodePortSet.validateEntry(entry); !valid { + klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortSet.Name)) + continue + } + nodePortSet.activeEntries.Insert(entry.String()) + } } // sync ipset entries @@ -1817,6 +1836,14 @@ func (proxier *Proxier) writeIptablesRules() { "-j", "ACCEPT", ) + // Add rule to accept traffic towards health check node port + utilproxy.WriteLine(proxier.filterRules, + "-A", string(KubeNodePortChain), + "-m", "comment", "--comment", proxier.ipsetList[kubeHealthCheckNodePortSet].getComment(), + "-m", "set", "--match-set", proxier.ipsetList[kubeHealthCheckNodePortSet].Name, "dst", + "-j", "ACCEPT", + ) + // Install the kubernetes-specific postrouting rules. We use a whole chain for // this so that it is easier to flush and change, for example if the mark // value should ever change. diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 11cd73fd810..46a0c4c6216 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -1920,6 +1920,75 @@ func TestOnlyLocalNodePorts(t *testing.T) { checkIptables(t, ipt, epIpt) } +func TestHealthCheckNodePort(t *testing.T) { + ipt, fp := buildFakeProxier() + + svcIP := "10.20.30.41" + svcPort := 80 + svcNodePort := 3000 + svcPortName := proxy.ServicePortName{ + NamespacedName: makeNSN("ns1", "svc1"), + Port: "p80", + } + + sampleSvc := makeTestService(svcPortName.Namespace, "", func(svc *v1.Service) { + svc.Spec.Type = "LoadBalancer" + svc.Spec.ClusterIP = svcIP + svc.Spec.Ports = []v1.ServicePort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: v1.ProtocolTCP, + NodePort: int32(svcNodePort), + }} + svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal + }) + + svc1, svc2, invalidSvc3 := *sampleSvc, *sampleSvc, *sampleSvc + svc1.Name, svc1.Spec.HealthCheckNodePort = "valid-svc1", 30000 + svc2.Name, svc2.Spec.HealthCheckNodePort = "valid-svc2", 30001 + // make svc3 invalid by setting external traffic policy to cluster + invalidSvc3.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeCluster + invalidSvc3.Name, invalidSvc3.Spec.HealthCheckNodePort = "invalid-svc3", 30002 + + makeServiceMap(fp, + &svc1, + &svc2, + &invalidSvc3, + ) + + itf := net.Interface{Index: 0, MTU: 0, Name: "eth0", HardwareAddr: nil, Flags: 0} + addrs := []net.Addr{proxyutiltest.AddrStruct{Val: "100.101.102.103/24"}} + itf1 := net.Interface{Index: 1, MTU: 0, Name: "eth1", HardwareAddr: nil, Flags: 0} + addrs1 := []net.Addr{proxyutiltest.AddrStruct{Val: "2001:db8::0/64"}} + fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf, addrs) + fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf1, addrs1) + fp.nodePortAddresses = []string{"100.101.102.0/24", "2001:db8::0/64"} + + fp.syncProxyRules() + + // check ipSet rules + makeTCPEntry := func(port int) *utilipset.Entry { + return &utilipset.Entry{ + Port: port, + Protocol: strings.ToLower(string(v1.ProtocolTCP)), + SetType: utilipset.BitmapPort, + } + } + epIPSet := netlinktest.ExpectedIPSet{ + // healthcheck node port set should only contain valid HC node ports + kubeHealthCheckNodePortSet: {makeTCPEntry(30000), makeTCPEntry(30001)}, + } + checkIPSet(t, fp, epIPSet) + + // Check iptables chain and rules + epIpt := netlinktest.ExpectedIptablesChain{ + string(KubeNodePortChain): {{ + JumpChain: "ACCEPT", MatchSet: kubeHealthCheckNodePortSet, + }}, + } + checkIptables(t, ipt, epIpt) +} + func TestLoadBalanceSourceRanges(t *testing.T) { ipt, fp := buildFakeProxier() @@ -4295,6 +4364,61 @@ func TestEndpointSliceE2E(t *testing.T) { assert.Nil(t, rsErr2, "Expected no error getting real servers") assert.Len(t, realServers2, 0, "Expected 0 real servers") } + +func TestHealthCheckNodePortE2E(t *testing.T) { + ipt := iptablestest.NewFake() + ipvs := ipvstest.NewFake() + ipset := ipsettest.NewFake(testIPSetVersion) + fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, true, v1.IPv4Protocol) + fp.servicesSynced = true + fp.endpointsSynced = true + fp.endpointSlicesSynced = true + + // Add initial service + serviceName := "svc1" + namespaceName := "ns1" + + svc := v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName}, + Spec: v1.ServiceSpec{ + ClusterIP: "172.20.1.1", + Selector: map[string]string{"foo": "bar"}, + Ports: []v1.ServicePort{{Name: "", TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP}}, + Type: "LoadBalancer", + HealthCheckNodePort: 30000, + ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeLocal, + }, + } + fp.OnServiceAdd(&svc) + fp.syncProxyRules() + + // Ensure that Proxier updates ipvs appropriately after service's being created + assert.NotNil(t, fp.ipsetList["KUBE-HEALTH-CHECK-NODE-PORT"]) + activeEntries1 := fp.ipsetList["KUBE-HEALTH-CHECK-NODE-PORT"].activeEntries + assert.Equal(t, 1, activeEntries1.Len(), "Expected 1 active entry in KUBE-HEALTH-CHECK-NODE-PORT") + assert.Equal(t, true, activeEntries1.Has("30000"), "Expected activeEntries to reference hc node port in spec") + + // Update health check node port in the spec + newSvc := svc + newSvc.Spec.HealthCheckNodePort = 30001 + fp.OnServiceUpdate(&svc, &newSvc) + fp.syncProxyRules() + + // Ensure that Proxier updates ipvs appropriately after service's being updated + assert.NotNil(t, fp.ipsetList["KUBE-HEALTH-CHECK-NODE-PORT"]) + activeEntries2 := fp.ipsetList["KUBE-HEALTH-CHECK-NODE-PORT"].activeEntries + assert.Equal(t, 1, activeEntries2.Len(), "Expected 1 active entry in KUBE-HEALTH-CHECK-NODE-PORT") + assert.Equal(t, true, activeEntries2.Has("30001"), "Expected activeEntries to reference updated hc node port in spec") + + fp.OnServiceDelete(&svc) + fp.syncProxyRules() + + // Ensure that Proxier updates ipvs appropriately after EndpointSlice delete + assert.NotNil(t, fp.ipsetList["KUBE-HEALTH-CHECK-NODE-PORT"]) + activeEntries3 := fp.ipsetList["KUBE-HEALTH-CHECK-NODE-PORT"].activeEntries + assert.Equal(t, 0, activeEntries3.Len(), "Expected 0 active entries in KUBE-HEALTH-CHECK-NODE-PORT") +} + func TestFilterCIDRs(t *testing.T) { var cidrList []string var cidrs []string @@ -4339,6 +4463,7 @@ func TestCreateAndLinkKubeChain(t *testing.T) { :KUBE-MARK-MASQ - [0:0] ` expectedFilterChains := `:KUBE-FORWARD - [0:0] +:KUBE-NODE-PORT - [0:0] ` assert.Equal(t, expectedNATChains, fp.natChains.String()) assert.Equal(t, expectedFilterChains, fp.filterChains.String()) diff --git a/pkg/util/iptables/testing/fake.go b/pkg/util/iptables/testing/fake.go index 46b5641b499..b0ac31e7109 100644 --- a/pkg/util/iptables/testing/fake.go +++ b/pkg/util/iptables/testing/fake.go @@ -38,6 +38,8 @@ const ( Jump = "-j " // Reject specifies the reject target Reject = "REJECT" + // Accept specifies the accept target + Accept = "ACCEPT" // ToDest represents the flag used to specify the destination address in DNAT ToDest = "--to-destination " // Recent represents the sub-command recent that allows to dynamically create list of IP address to match against