From be92fc83e2e385b8b01c327c3eeee7f64964faa3 Mon Sep 17 00:00:00 2001 From: Andrew Sy Kim Date: Wed, 10 Mar 2021 21:36:31 -0500 Subject: [PATCH 01/17] proxier: simplify toplogy FilterLocalEndpoint function Signed-off-by: Andrew Sy Kim --- pkg/proxy/topology.go | 20 ++------- pkg/proxy/topology_test.go | 89 ++++++++++---------------------------- 2 files changed, 26 insertions(+), 83 deletions(-) diff --git a/pkg/proxy/topology.go b/pkg/proxy/topology.go index 0e0e85461a2..c66c721b72a 100644 --- a/pkg/proxy/topology.go +++ b/pkg/proxy/topology.go @@ -32,7 +32,7 @@ func FilterEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeLabels map[s } if utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) && svcInfo.NodeLocalInternal() { - return filterEndpointsInternalTrafficPolicy(svcInfo.InternalTrafficPolicy(), endpoints) + return FilterLocalEndpoint(endpoints) } if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) { @@ -85,20 +85,8 @@ func filterEndpointsWithHints(endpoints []Endpoint, hintsAnnotation string, node return filteredEndpoints } -// filterEndpointsInternalTrafficPolicy returns the node local endpoints based -// on configured InternalTrafficPolicy. -// -// If ServiceInternalTrafficPolicy feature gate is off, returns the original -// EndpointSlice. -// Otherwise, if InternalTrafficPolicy is Local, only return the node local endpoints. -func filterEndpointsInternalTrafficPolicy(internalTrafficPolicy *v1.ServiceInternalTrafficPolicyType, endpoints []Endpoint) []Endpoint { - if !utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) { - return endpoints - } - if internalTrafficPolicy == nil || *internalTrafficPolicy == v1.ServiceInternalTrafficPolicyCluster { - return endpoints - } - +// FilterLocalEndpoint returns the node local endpoints +func FilterLocalEndpoint(endpoints []Endpoint) []Endpoint { var filteredEndpoints []Endpoint // Get all the local endpoints @@ -108,7 +96,5 @@ func filterEndpointsInternalTrafficPolicy(internalTrafficPolicy *v1.ServiceInter } } - // When internalTrafficPolicy is Local, only return the node local - // endpoints return filteredEndpoints } diff --git a/pkg/proxy/topology_test.go b/pkg/proxy/topology_test.go index 8c207cfdd9e..7d686cf2a47 100644 --- a/pkg/proxy/topology_test.go +++ b/pkg/proxy/topology_test.go @@ -335,93 +335,50 @@ func Test_filterEndpointsWithHints(t *testing.T) { } } -func Test_filterEndpointsInternalTrafficPolicy(t *testing.T) { - cluster := v1.ServiceInternalTrafficPolicyCluster - local := v1.ServiceInternalTrafficPolicyLocal - +func TestFilterLocalEndpoint(t *testing.T) { testCases := []struct { - name string - internalTrafficPolicy *v1.ServiceInternalTrafficPolicyType - endpoints []Endpoint - expected []Endpoint - featureGateOn bool + name string + endpoints []Endpoint + expected []Endpoint }{ { - name: "no internalTrafficPolicy with empty endpoints", - internalTrafficPolicy: nil, - endpoints: []Endpoint{}, - expected: []Endpoint{}, - featureGateOn: true, + name: "with empty endpoints", + endpoints: []Endpoint{}, + expected: nil, }, { - name: "no internalTrafficPolicy with non-empty endpoints", - internalTrafficPolicy: nil, - endpoints: []Endpoint{ - &BaseEndpointInfo{Endpoint: "10.0.0.0:80", IsLocal: true}, - &BaseEndpointInfo{Endpoint: "10.0.0.1:80", IsLocal: false}, - }, - expected: []Endpoint{ - &BaseEndpointInfo{Endpoint: "10.0.0.0:80", IsLocal: true}, - &BaseEndpointInfo{Endpoint: "10.0.0.1:80", IsLocal: false}, - }, - featureGateOn: true, - }, - { - name: "internalTrafficPolicy is cluster", - internalTrafficPolicy: &cluster, - endpoints: []Endpoint{ - &BaseEndpointInfo{Endpoint: "10.0.0.0:80", IsLocal: true}, - &BaseEndpointInfo{Endpoint: "10.0.0.1:80", IsLocal: false}, - }, - expected: []Endpoint{ - &BaseEndpointInfo{Endpoint: "10.0.0.0:80", IsLocal: true}, - &BaseEndpointInfo{Endpoint: "10.0.0.1:80", IsLocal: false}, - }, - featureGateOn: true, - }, - { - name: "internalTrafficPolicy is local with non-zero local endpoints", - internalTrafficPolicy: &local, - endpoints: []Endpoint{ - &BaseEndpointInfo{Endpoint: "10.0.0.0:80", IsLocal: true}, - &BaseEndpointInfo{Endpoint: "10.0.0.1:80", IsLocal: false}, - }, - expected: []Endpoint{ - &BaseEndpointInfo{Endpoint: "10.0.0.0:80", IsLocal: true}, - }, - featureGateOn: true, - }, - { - name: "internalTrafficPolicy is local with zero local endpoints", - internalTrafficPolicy: &local, + name: "all endpoints not local", endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.0.0.0:80", IsLocal: false}, &BaseEndpointInfo{Endpoint: "10.0.0.1:80", IsLocal: false}, - &BaseEndpointInfo{Endpoint: "10.0.0.2:80", IsLocal: false}, }, - expected: nil, - featureGateOn: true, + expected: nil, }, { - name: "feature gate is off, internalTrafficPolicy is local with non-empty endpoints", - internalTrafficPolicy: &local, + name: "all endpoints are local", endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.0.0.0:80", IsLocal: true}, - &BaseEndpointInfo{Endpoint: "10.0.0.1:80", IsLocal: false}, - &BaseEndpointInfo{Endpoint: "10.0.0.2:80", IsLocal: false}, + &BaseEndpointInfo{Endpoint: "10.0.0.1:80", IsLocal: true}, + }, + expected: []Endpoint{ + &BaseEndpointInfo{Endpoint: "10.0.0.0:80", IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.0.1:80", IsLocal: true}, + }, + }, + { + name: "some endpoints are local", + endpoints: []Endpoint{ + &BaseEndpointInfo{Endpoint: "10.0.0.0:80", IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.0.1:80", IsLocal: false}, }, expected: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.0.0.0:80", IsLocal: true}, - &BaseEndpointInfo{Endpoint: "10.0.0.1:80", IsLocal: false}, - &BaseEndpointInfo{Endpoint: "10.0.0.2:80", IsLocal: false}, }, - featureGateOn: false, }, } for _, tc := range testCases { - defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceInternalTrafficPolicy, tc.featureGateOn)() t.Run(tc.name, func(t *testing.T) { - filteredEndpoint := filterEndpointsInternalTrafficPolicy(tc.internalTrafficPolicy, tc.endpoints) + filteredEndpoint := FilterLocalEndpoint(tc.endpoints) if !reflect.DeepEqual(filteredEndpoint, tc.expected) { t.Errorf("expected %v, got %v", tc.expected, filteredEndpoint) } From 732635fd4b2f9a532f79dcb564ce5c0c74d79605 Mon Sep 17 00:00:00 2001 From: Andrew Sy Kim Date: Wed, 10 Mar 2021 11:07:44 -0500 Subject: [PATCH 02/17] proxier/iptables: fallback to terminating endpoints if there are no ready endpoints Signed-off-by: Andrew Sy Kim --- pkg/proxy/iptables/proxier.go | 64 ++++++++++++++++++++++++----------- 1 file changed, 44 insertions(+), 20 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 0e7fb1a73cb..d745689d9e9 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -1033,16 +1033,7 @@ func (proxier *Proxier) syncProxyRules() { // Service does not have conflicting configuration such as // externalTrafficPolicy=Local. allEndpoints = proxy.FilterEndpoints(allEndpoints, svcInfo, proxier.nodeLabels) - - readyEndpoints := make([]proxy.Endpoint, 0, len(allEndpoints)) - for _, endpoint := range allEndpoints { - if !endpoint.IsReady() { - continue - } - - readyEndpoints = append(readyEndpoints, endpoint) - } - hasEndpoints := len(readyEndpoints) > 0 + hasEndpoints := len(allEndpoints) > 0 svcChain := svcInfo.servicePortChainName if hasEndpoints { @@ -1365,7 +1356,7 @@ func (proxier *Proxier) syncProxyRules() { endpoints = endpoints[:0] endpointChains = endpointChains[:0] var endpointChain utiliptables.Chain - for _, ep := range readyEndpoints { + for _, ep := range allEndpoints { epInfo, ok := ep.(*endpointsInfo) if !ok { klog.ErrorS(err, "Failed to cast endpointsInfo", "endpointsInfo", ep.String()) @@ -1401,16 +1392,33 @@ func (proxier *Proxier) syncProxyRules() { } } - // Now write loadbalancing & DNAT rules. - n := len(endpointChains) - localEndpointChains := make([]utiliptables.Chain, 0) + // Firstly, categorize each endpoint into three buckets: + // 1. all endpoints that are ready and NOT terminating. + // 2. all endpoints that are local, ready and NOT terminating, and externalTrafficPolicy=Local + // 3. all endpoints that are local, serving and terminating, and externalTrafficPolicy=Local + readyEndpointChains := make([]utiliptables.Chain, 0, len(endpointChains)) + readyEndpoints := make([]*endpointsInfo, 0, len(allEndpoints)) + localReadyEndpointChains := make([]utiliptables.Chain, 0, len(endpointChains)) + localServingTerminatingEndpointChains := make([]utiliptables.Chain, 0, len(endpointChains)) for i, endpointChain := range endpointChains { - // Write ingress loadbalancing & DNAT rules only for services that request OnlyLocal traffic. - if svcInfo.NodeLocalExternal() && endpoints[i].IsLocal { - localEndpointChains = append(localEndpointChains, endpointChains[i]) + if endpoints[i].Ready { + readyEndpointChains = append(readyEndpointChains, endpointChain) + readyEndpoints = append(readyEndpoints, endpoints[i]) } - epIP := endpoints[i].IP() + if svc.NodeLocalExternal() && endpoints[i].IsLocal { + if endpoints[i].Ready { + localReadyEndpointChains = append(localReadyEndpointChains, endpointChain) + } else if endpoints[i].Serving && endpoints[i].Terminating { + localServingTerminatingEndpointChains = append(localServingTerminatingEndpointChains, endpointChain) + } + } + } + + // Now write loadbalancing & DNAT rules. + numReadyEndpoints := len(readyEndpointChains) + for i, endpointChain := range readyEndpointChains { + epIP := readyEndpoints[i].IP() if epIP == "" { // Error parsing this endpoint has been logged. Skip to next endpoint. continue @@ -1419,16 +1427,26 @@ func (proxier *Proxier) syncProxyRules() { // Balancing rules in the per-service chain. args = append(args[:0], "-A", string(svcChain)) args = proxier.appendServiceCommentLocked(args, svcNameString) - if i < (n - 1) { + if i < (numReadyEndpoints - 1) { // Each rule is a probabilistic match. args = append(args, "-m", "statistic", "--mode", "random", - "--probability", proxier.probability(n-i)) + "--probability", proxier.probability(numReadyEndpoints-i)) } // The final (or only if n == 1) rule is a guaranteed match. args = append(args, "-j", string(endpointChain)) utilproxy.WriteLine(proxier.natRules, args...) + } + + // Every endpoint gets a chain, regardless of its state. This is required later since we may + // want to jump to endpoint chains that are terminating. + for i, endpointChain := range endpointChains { + epIP := endpoints[i].IP() + if epIP == "" { + // Error parsing this endpoint has been logged. Skip to next endpoint. + continue + } // Rules in the per-endpoint chain. args = append(args[:0], "-A", string(endpointChain)) @@ -1474,6 +1492,12 @@ func (proxier *Proxier) syncProxyRules() { "-m", "comment", "--comment", fmt.Sprintf(`"route LOCAL traffic for %s LB IP to service chain"`, svcNameString), "-m", "addrtype", "--src-type", "LOCAL", "-j", string(svcChain))...) + // Prefer local ready endpoint chains, but fall back to ready terminating if none exist + localEndpointChains := localReadyEndpointChains + if len(localEndpointChains) == 0 { + localEndpointChains = localServingTerminatingEndpointChains + } + numLocalEndpoints := len(localEndpointChains) if numLocalEndpoints == 0 { // Blackhole all traffic since there are no local endpoints From b54c0568d8c9bee4e1e6c4861bf3138fbd23ee1d Mon Sep 17 00:00:00 2001 From: Andrew Sy Kim Date: Wed, 10 Mar 2021 11:08:19 -0500 Subject: [PATCH 03/17] proxier/iptables: add unit tests for falling back to terminating endpoints Signed-off-by: Andrew Sy Kim --- pkg/proxy/iptables/proxier_test.go | 309 ++++++++++++++++++++++++++++- 1 file changed, 301 insertions(+), 8 deletions(-) diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index ec4328604b0..407693a8095 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -2967,6 +2967,7 @@ COMMIT :KUBE-SEP-3JOIVZTXZZRGORX4 - [0:0] :KUBE-SEP-IO5XOSKPAXIFQXAJ - [0:0] :KUBE-SEP-XGJFVO3L2O5SRFNT - [0:0] +:KUBE-SEP-VLJB2F747S6W7EX4 - [0:0] -A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN -A KUBE-POSTROUTING -j MARK --xor-mark 0x4000 -A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE @@ -2974,14 +2975,16 @@ COMMIT -A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 0 ! -s 10.0.0.0/24 -j KUBE-MARK-MASQ -A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 0 -j KUBE-SVC-AQI2S6QIMU7PVVRP -A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.3333333333 -j KUBE-SEP-3JOIVZTXZZRGORX4 +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-IO5XOSKPAXIFQXAJ +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-XGJFVO3L2O5SRFNT -A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -s 10.0.1.1/32 -j KUBE-MARK-MASQ -A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80 --A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-IO5XOSKPAXIFQXAJ -A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -s 10.0.1.2/32 -j KUBE-MARK-MASQ -A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.2:80 --A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-XGJFVO3L2O5SRFNT -A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -s 10.0.1.3/32 -j KUBE-MARK-MASQ -A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.3:80 +-A KUBE-SEP-VLJB2F747S6W7EX4 -m comment --comment ns1/svc1 -s 10.0.1.4/32 -j KUBE-MARK-MASQ +-A KUBE-SEP-VLJB2F747S6W7EX4 -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.4:80 -A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS COMMIT ` @@ -3029,7 +3032,7 @@ COMMIT Addresses: []string{"10.0.1.3"}, Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)}, Topology: map[string]string{"kubernetes.io/hostname": "node3"}, - }, { // not ready endpoints should be ignored + }, { Addresses: []string{"10.0.1.4"}, Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(false)}, Topology: map[string]string{"kubernetes.io/hostname": "node4"}, @@ -3067,6 +3070,7 @@ COMMIT :KUBE-SEP-3JOIVZTXZZRGORX4 - [0:0] :KUBE-SEP-IO5XOSKPAXIFQXAJ - [0:0] :KUBE-SEP-XGJFVO3L2O5SRFNT - [0:0] +:KUBE-SEP-VLJB2F747S6W7EX4 - [0:0] -A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN -A KUBE-POSTROUTING -j MARK --xor-mark 0x4000 -A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE @@ -3076,14 +3080,16 @@ COMMIT -A KUBE-NODEPORTS -m comment --comment ns1/svc1 -m tcp -p tcp --dport 30010 -s 127.0.0.0/8 -j KUBE-MARK-MASQ -A KUBE-NODEPORTS -m comment --comment ns1/svc1 -m tcp -p tcp --dport 30010 -j KUBE-XLB-AQI2S6QIMU7PVVRP -A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.3333333333 -j KUBE-SEP-3JOIVZTXZZRGORX4 +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-IO5XOSKPAXIFQXAJ +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-XGJFVO3L2O5SRFNT -A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -s 10.0.1.1/32 -j KUBE-MARK-MASQ -A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80 --A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-IO5XOSKPAXIFQXAJ -A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -s 10.0.1.2/32 -j KUBE-MARK-MASQ -A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.2:80 --A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-XGJFVO3L2O5SRFNT -A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -s 10.0.1.3/32 -j KUBE-MARK-MASQ -A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.3:80 +-A KUBE-SEP-VLJB2F747S6W7EX4 -m comment --comment ns1/svc1 -s 10.0.1.4/32 -j KUBE-MARK-MASQ +-A KUBE-SEP-VLJB2F747S6W7EX4 -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.4:80 -A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Redirect pods trying to reach external loadbalancer VIP to clusterIP" -s 10.0.0.0/24 -j KUBE-SVC-AQI2S6QIMU7PVVRP -A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "masquerade LOCAL traffic for ns1/svc1 LB IP" -m addrtype --src-type LOCAL -j KUBE-MARK-MASQ -A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "route LOCAL traffic for ns1/svc1 LB IP to service chain" -m addrtype --src-type LOCAL -j KUBE-SVC-AQI2S6QIMU7PVVRP @@ -3139,7 +3145,7 @@ COMMIT Addresses: []string{"10.0.1.3"}, Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)}, Topology: map[string]string{"kubernetes.io/hostname": "node3"}, - }, { // not ready endpoints should be ignored + }, { Addresses: []string{"10.0.1.4"}, Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(false)}, Topology: map[string]string{"kubernetes.io/hostname": "node4"}, @@ -3397,12 +3403,12 @@ COMMIT -A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 80 ! -s 10.0.0.0/24 -j KUBE-MARK-MASQ -A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 80 -j KUBE-SVC-AQI2S6QIMU7PVVRP -A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.3333333333 -j KUBE-SEP-3JOIVZTXZZRGORX4 +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-IO5XOSKPAXIFQXAJ +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-XGJFVO3L2O5SRFNT -A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -s 10.0.1.1/32 -j KUBE-MARK-MASQ -A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80 --A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-IO5XOSKPAXIFQXAJ -A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -s 10.0.1.2/32 -j KUBE-MARK-MASQ -A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.2:80 --A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-XGJFVO3L2O5SRFNT -A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -s 10.0.1.3/32 -j KUBE-MARK-MASQ -A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.3:80 -A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS @@ -3575,3 +3581,290 @@ COMMIT } } } + +// Test_EndpointSliceReadyAndTerminatingLocal tests that when there are local ready and ready + terminating +// endpoints, only the ready endpoints are used. +func Test_EndpointSliceReadyAndTerminatingLocal(t *testing.T) { + expectedIPTablesWithSlice := `*filter +:KUBE-SERVICES - [0:0] +:KUBE-EXTERNAL-SERVICES - [0:0] +:KUBE-FORWARD - [0:0] +:KUBE-NODEPORTS - [0:0] +-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP +-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT +-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod source rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT +-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod destination rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT +COMMIT +*nat +:KUBE-SERVICES - [0:0] +:KUBE-NODEPORTS - [0:0] +:KUBE-POSTROUTING - [0:0] +:KUBE-MARK-MASQ - [0:0] +:KUBE-SVC-AQI2S6QIMU7PVVRP - [0:0] +:KUBE-XLB-AQI2S6QIMU7PVVRP - [0:0] +:KUBE-SEP-3JOIVZTXZZRGORX4 - [0:0] +:KUBE-SEP-IO5XOSKPAXIFQXAJ - [0:0] +:KUBE-SEP-XGJFVO3L2O5SRFNT - [0:0] +:KUBE-SEP-VLJB2F747S6W7EX4 - [0:0] +:KUBE-SEP-EQCHZ7S2PJ72OHAY - [0:0] +-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN +-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000 +-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE +-A KUBE-MARK-MASQ -j MARK --or-mark 0x4000 +-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 0 ! -s 10.0.0.0/24 -j KUBE-MARK-MASQ +-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 0 -j KUBE-SVC-AQI2S6QIMU7PVVRP +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.3333333333 -j KUBE-SEP-3JOIVZTXZZRGORX4 +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-IO5XOSKPAXIFQXAJ +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-EQCHZ7S2PJ72OHAY +-A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -s 10.0.1.1/32 -j KUBE-MARK-MASQ +-A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80 +-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -s 10.0.1.2/32 -j KUBE-MARK-MASQ +-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.2:80 +-A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -s 10.0.1.3/32 -j KUBE-MARK-MASQ +-A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.3:80 +-A KUBE-SEP-VLJB2F747S6W7EX4 -m comment --comment ns1/svc1 -s 10.0.1.4/32 -j KUBE-MARK-MASQ +-A KUBE-SEP-VLJB2F747S6W7EX4 -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.4:80 +-A KUBE-SEP-EQCHZ7S2PJ72OHAY -m comment --comment ns1/svc1 -s 10.0.1.5/32 -j KUBE-MARK-MASQ +-A KUBE-SEP-EQCHZ7S2PJ72OHAY -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.5:80 +-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Redirect pods trying to reach external loadbalancer VIP to clusterIP" -s 10.0.0.0/24 -j KUBE-SVC-AQI2S6QIMU7PVVRP +-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "masquerade LOCAL traffic for ns1/svc1 LB IP" -m addrtype --src-type LOCAL -j KUBE-MARK-MASQ +-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "route LOCAL traffic for ns1/svc1 LB IP to service chain" -m addrtype --src-type LOCAL -j KUBE-SVC-AQI2S6QIMU7PVVRP +-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Balancing rule 0 for ns1/svc1" -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-3JOIVZTXZZRGORX4 +-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Balancing rule 1 for ns1/svc1" -j KUBE-SEP-IO5XOSKPAXIFQXAJ +-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS +COMMIT +` + + ipt := iptablestest.NewFake() + fp := NewFakeProxier(ipt, true) + fp.OnServiceSynced() + fp.OnEndpointsSynced() + fp.OnEndpointSlicesSynced() + + serviceName := "svc1" + namespaceName := "ns1" + + fp.OnServiceAdd(&v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName}, + Spec: v1.ServiceSpec{ + ClusterIP: "172.20.1.1", + Type: v1.ServiceTypeNodePort, + ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeLocal, + Selector: map[string]string{"foo": "bar"}, + Ports: []v1.ServicePort{ + { + Name: "", + TargetPort: intstr.FromInt(80), + Protocol: v1.ProtocolTCP, + }, + }, + }, + }) + + tcpProtocol := v1.ProtocolTCP + endpointSlice := &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-1", serviceName), + Namespace: namespaceName, + Labels: map[string]string{discovery.LabelServiceName: serviceName}, + }, + Ports: []discovery.EndpointPort{{ + Name: utilpointer.StringPtr(""), + Port: utilpointer.Int32Ptr(80), + Protocol: &tcpProtocol, + }}, + AddressType: discovery.AddressTypeIPv4, + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{"10.0.1.1"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(true), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(false), + }, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, + { + Addresses: []string{"10.0.1.2"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(true), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(false), + }, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, + { + // this endpoint should be ignored for node ports since there are ready non-terminating endpoints + Addresses: []string{"10.0.1.3"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(true), + }, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, + { + // this endpoint should be ignored for node ports since there are ready non-terminating endpoints + Addresses: []string{"10.0.1.4"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(false), + Terminating: utilpointer.BoolPtr(true), + }, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, + { + // this endpoint should be ignored for node ports since it's not local + Addresses: []string{"10.0.1.5"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(true), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(false), + }, + Topology: map[string]string{"kubernetes.io/hostname": "host-1"}, + }, + }, + } + + fp.OnEndpointSliceAdd(endpointSlice) + fp.syncProxyRules() + t.Log(fp.iptablesData.String()) + assert.Equal(t, expectedIPTablesWithSlice, fp.iptablesData.String()) + + fp.OnEndpointSliceDelete(endpointSlice) + fp.syncProxyRules() + assert.NotEqual(t, expectedIPTablesWithSlice, fp.iptablesData.String()) +} + +// Test_EndpointSliceOnlyReadyTerminatingLocal tests that when there are only local ready terminating +// endpoints, we fall back to those endpoints. +func Test_EndpointSliceOnlyReadyTerminatingLocal(t *testing.T) { + expectedIPTablesWithSlice := `*filter +:KUBE-SERVICES - [0:0] +:KUBE-EXTERNAL-SERVICES - [0:0] +:KUBE-FORWARD - [0:0] +:KUBE-NODEPORTS - [0:0] +-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP +-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT +-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod source rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT +-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod destination rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT +COMMIT +*nat +:KUBE-SERVICES - [0:0] +:KUBE-NODEPORTS - [0:0] +:KUBE-POSTROUTING - [0:0] +:KUBE-MARK-MASQ - [0:0] +:KUBE-SVC-AQI2S6QIMU7PVVRP - [0:0] +:KUBE-XLB-AQI2S6QIMU7PVVRP - [0:0] +:KUBE-SEP-3JOIVZTXZZRGORX4 - [0:0] +:KUBE-SEP-IO5XOSKPAXIFQXAJ - [0:0] +:KUBE-SEP-XGJFVO3L2O5SRFNT - [0:0] +:KUBE-SEP-VLJB2F747S6W7EX4 - [0:0] +-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN +-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000 +-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE +-A KUBE-MARK-MASQ -j MARK --or-mark 0x4000 +-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 0 ! -s 10.0.0.0/24 -j KUBE-MARK-MASQ +-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 0 -j KUBE-SVC-AQI2S6QIMU7PVVRP +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-VLJB2F747S6W7EX4 +-A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -s 10.0.1.1/32 -j KUBE-MARK-MASQ +-A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80 +-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -s 10.0.1.2/32 -j KUBE-MARK-MASQ +-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.2:80 +-A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -s 10.0.1.3/32 -j KUBE-MARK-MASQ +-A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.3:80 +-A KUBE-SEP-VLJB2F747S6W7EX4 -m comment --comment ns1/svc1 -s 10.0.1.4/32 -j KUBE-MARK-MASQ +-A KUBE-SEP-VLJB2F747S6W7EX4 -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.4:80 +-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Redirect pods trying to reach external loadbalancer VIP to clusterIP" -s 10.0.0.0/24 -j KUBE-SVC-AQI2S6QIMU7PVVRP +-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "masquerade LOCAL traffic for ns1/svc1 LB IP" -m addrtype --src-type LOCAL -j KUBE-MARK-MASQ +-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "route LOCAL traffic for ns1/svc1 LB IP to service chain" -m addrtype --src-type LOCAL -j KUBE-SVC-AQI2S6QIMU7PVVRP +-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Balancing rule 0 for ns1/svc1" -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-3JOIVZTXZZRGORX4 +-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Balancing rule 1 for ns1/svc1" -j KUBE-SEP-IO5XOSKPAXIFQXAJ +-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS +COMMIT +` + + ipt := iptablestest.NewFake() + fp := NewFakeProxier(ipt, true) + fp.OnServiceSynced() + fp.OnEndpointsSynced() + fp.OnEndpointSlicesSynced() + + serviceName := "svc1" + namespaceName := "ns1" + + fp.OnServiceAdd(&v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName}, + Spec: v1.ServiceSpec{ + ClusterIP: "172.20.1.1", + Type: v1.ServiceTypeNodePort, + ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeLocal, + Selector: map[string]string{"foo": "bar"}, + Ports: []v1.ServicePort{{Name: "", TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP}}, + }, + }) + + tcpProtocol := v1.ProtocolTCP + endpointSlice := &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-1", serviceName), + Namespace: namespaceName, + Labels: map[string]string{discovery.LabelServiceName: serviceName}, + }, + Ports: []discovery.EndpointPort{{ + Name: utilpointer.StringPtr(""), + Port: utilpointer.Int32Ptr(80), + Protocol: &tcpProtocol, + }}, + AddressType: discovery.AddressTypeIPv4, + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{"10.0.1.1"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(true), + }, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, + { + Addresses: []string{"10.0.1.2"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(true), + }, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, + { + // this endpoint should be ignored since it is a not ready terminating endpoint + Addresses: []string{"10.0.1.3"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(false), + Terminating: utilpointer.BoolPtr(true), + }, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, + { + // this endpoint should be ignored since it is on another host. + Addresses: []string{"10.0.1.4"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(true), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(false), + }, + Topology: map[string]string{"kubernetes.io/hostname": "another-host"}, + }, + }, + } + + fp.OnEndpointSliceAdd(endpointSlice) + fp.syncProxyRules() + t.Log(fp.iptablesData.String()) + assert.Equal(t, expectedIPTablesWithSlice, fp.iptablesData.String()) + + fp.OnEndpointSliceDelete(endpointSlice) + fp.syncProxyRules() + assert.NotEqual(t, expectedIPTablesWithSlice, fp.iptablesData.String()) +} From 9d4e24aa32ddbf5f76bd554d8f7463ffd9461311 Mon Sep 17 00:00:00 2001 From: Andrew Sy Kim Date: Wed, 10 Mar 2021 21:35:54 -0500 Subject: [PATCH 04/17] proxier/ipvs: fall back to ready terminating if no ready endpoint exists Signed-off-by: Andrew Sy Kim --- pkg/proxy/ipvs/proxier.go | 48 +++++++++++++++++++++++++++------------ 1 file changed, 34 insertions(+), 14 deletions(-) diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 48b61d636dc..ee688ff870c 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -1237,7 +1237,11 @@ func (proxier *Proxier) syncProxyRules() { activeBindAddrs[serv.Address.String()] = true // ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP // So we still need clusterIP rules in onlyNodeLocalEndpoints mode. - if err := proxier.syncEndpoint(svcName, false, svcInfo.NodeLocalInternal(), serv); err != nil { + internalNodeLocal := false + if utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) && svcInfo.NodeLocalInternal() { + internalNodeLocal = true + } + if err := proxier.syncEndpoint(svcName, internalNodeLocal, serv); err != nil { klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv.String()) } } else { @@ -1319,9 +1323,7 @@ func (proxier *Proxier) syncProxyRules() { activeIPVSServices[serv.String()] = true activeBindAddrs[serv.Address.String()] = true - onlyNodeLocalEndpoints := svcInfo.NodeLocalExternal() - onlyNodeLocalEndpointsForInternal := svcInfo.NodeLocalInternal() - if err := proxier.syncEndpoint(svcName, onlyNodeLocalEndpoints, onlyNodeLocalEndpointsForInternal, serv); err != nil { + if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), serv); err != nil { klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv.String()) } } else { @@ -1422,7 +1424,7 @@ func (proxier *Proxier) syncProxyRules() { if err := proxier.syncService(svcNameString, serv, true, bindedAddresses); err == nil { activeIPVSServices[serv.String()] = true activeBindAddrs[serv.Address.String()] = true - if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), svcInfo.NodeLocalInternal(), serv); err != nil { + if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), serv); err != nil { klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv) } } else { @@ -1590,7 +1592,7 @@ func (proxier *Proxier) syncProxyRules() { // There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`. if err := proxier.syncService(svcNameString, serv, false, bindedAddresses); err == nil { activeIPVSServices[serv.String()] = true - if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), svcInfo.NodeLocalInternal(), serv); err != nil { + if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), serv); err != nil { klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv) } } else { @@ -2041,7 +2043,7 @@ func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer, return nil } -func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNodeLocalEndpoints bool, onlyNodeLocalEndpointsForInternal bool, vs *utilipvs.VirtualServer) error { +func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNodeLocalEndpoints bool, vs *utilipvs.VirtualServer) error { appliedVirtualServer, err := proxier.ipvs.GetVirtualServer(vs) if err != nil { klog.Errorf("Failed to get IPVS service, error: %v", err) @@ -2053,8 +2055,13 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode // curEndpoints represents IPVS destinations listed from current system. curEndpoints := sets.NewString() - // newEndpoints represents Endpoints watched from API Server. - newEndpoints := sets.NewString() + // readyEndpoints represents Endpoints watched from API Server. + readyEndpoints := sets.NewString() + // localReadyEndpoints represents local endpoints that are ready and NOT terminating. + localReadyEndpoints := sets.NewString() + // localReadyTerminatingEndpoints represents local endpoints that are ready AND terminating. + // Fall back to these endpoints if no non-terminating ready endpoints exist for node-local traffic. + localReadyTerminatingEndpoints := sets.NewString() curDests, err := proxier.ipvs.GetRealServers(appliedVirtualServer) if err != nil { @@ -2079,15 +2086,28 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode } for _, epInfo := range endpoints { - if !epInfo.IsReady() { - continue + if epInfo.IsReady() { + readyEndpoints.Insert(epInfo.String()) } - if onlyNodeLocalEndpoints && !epInfo.GetIsLocal() { - continue + if onlyNodeLocalEndpoints && epInfo.GetIsLocal() { + if epInfo.IsReady() { + localReadyEndpoints.Insert(epInfo.String()) + } else if epInfo.IsServing() && epInfo.IsTerminating() { + localReadyTerminatingEndpoints.Insert(epInfo.String()) + } } + } - newEndpoints.Insert(epInfo.String()) + newEndpoints := readyEndpoints + if onlyNodeLocalEndpoints { + newEndpoints = localReadyEndpoints + + if utilfeature.DefaultFeatureGate.Enabled(features.ProxyTerminatingEndpoints) { + if len(newEndpoints) == 0 && localReadyTerminatingEndpoints.Len() > 0 { + newEndpoints = localReadyTerminatingEndpoints + } + } } // Create new endpoints From 55881093d8a6a9118343c3dcbbabe0ce3555d504 Mon Sep 17 00:00:00 2001 From: Andrew Sy Kim Date: Wed, 10 Mar 2021 21:36:03 -0500 Subject: [PATCH 05/17] proxier/ipvs: add ipvs unit tests for falling back to terminating endpoints Signed-off-by: Andrew Sy Kim --- pkg/proxy/ipvs/proxier_test.go | 343 +++++++++++++++++++++++++++++++++ 1 file changed, 343 insertions(+) diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 62d4b91fc25..52eda4440d1 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -4653,3 +4653,346 @@ func TestTestInternalTrafficPolicyE2E(t *testing.T) { assert.Len(t, realServers2, 0, "Expected 0 real servers") } } + +// Test_EndpointSliceReadyAndTerminatingLocal tests that when there are local ready and ready + terminating +// endpoints, only the ready endpoints are used. +func Test_EndpointSliceReadyAndTerminatingLocal(t *testing.T) { + ipt := iptablestest.NewFake() + ipvs := ipvstest.NewFake() + ipset := ipsettest.NewFake(testIPSetVersion) + fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, true, v1.IPv4Protocol) + fp.servicesSynced = true + fp.endpointsSynced = true + fp.endpointSlicesSynced = true + + localInternalTrafficPolicy := v1.ServiceInternalTrafficPolicyLocal + + serviceName := "svc1" + // Add initial service + namespaceName := "ns1" + fp.OnServiceAdd(&v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName}, + Spec: v1.ServiceSpec{ + ClusterIP: "172.20.1.1", + Selector: map[string]string{"foo": "bar"}, + Type: v1.ServiceTypeNodePort, + ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeLocal, + InternalTrafficPolicy: &localInternalTrafficPolicy, + ExternalIPs: []string{ + "1.2.3.4", + }, + Ports: []v1.ServicePort{ + { + Name: "", + TargetPort: intstr.FromInt(80), + Protocol: v1.ProtocolTCP, + }, + }, + }, + }) + + // Add initial endpoint slice + tcpProtocol := v1.ProtocolTCP + endpointSlice := &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-1", serviceName), + Namespace: namespaceName, + Labels: map[string]string{discovery.LabelServiceName: serviceName}, + }, + Ports: []discovery.EndpointPort{{ + Name: utilpointer.StringPtr(""), + Port: utilpointer.Int32Ptr(80), + Protocol: &tcpProtocol, + }}, + AddressType: discovery.AddressTypeIPv4, + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{"10.0.1.1"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(true), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(false), + }, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, + { + Addresses: []string{"10.0.1.2"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(true), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(false), + }, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, + { + Addresses: []string{"10.0.1.3"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(true), + }, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, + { + Addresses: []string{"10.0.1.4"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(false), + Terminating: utilpointer.BoolPtr(true), + }, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, + { + Addresses: []string{"10.0.1.5"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(true), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(false), + }, + Topology: map[string]string{"kubernetes.io/hostname": "another-host"}, + }, + }, + } + + fp.OnEndpointSliceAdd(endpointSlice) + fp.syncProxyRules() + + // Ensure that Proxier updates ipvs appropriately after EndpointSlice update + assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"]) + activeEntries1 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries + assert.Equal(t, 4, activeEntries1.Len(), "Expected 3 active entry in KUBE-LOOP-BACK") + assert.Equal(t, true, activeEntries1.Has("10.0.1.1,tcp:80,10.0.1.1"), "Expected activeEntries to reference first (local) pod") + assert.Equal(t, true, activeEntries1.Has("10.0.1.2,tcp:80,10.0.1.2"), "Expected activeEntries to reference second (local) pod") + assert.Equal(t, true, activeEntries1.Has("10.0.1.3,tcp:80,10.0.1.3"), "Expected activeEntries to reference second (local) pod") + assert.Equal(t, true, activeEntries1.Has("10.0.1.4,tcp:80,10.0.1.4"), "Expected activeEntries to reference second (local) pod") + + virtualServers, vsErr := ipvs.GetVirtualServers() + assert.Nil(t, vsErr, "Expected no error getting virtual servers") + assert.Len(t, virtualServers, 2, "Expected 2 virtual server") + + var clusterIPServer, externalIPServer *utilipvs.VirtualServer + for _, virtualServer := range virtualServers { + if virtualServer.Address.String() == "172.20.1.1" { + clusterIPServer = virtualServer + } + + if virtualServer.Address.String() == "1.2.3.4" { + externalIPServer = virtualServer + } + } + + // clusterIP should route to cluster-wide ready endpoints + realServers1, rsErr1 := ipvs.GetRealServers(clusterIPServer) + assert.Nil(t, rsErr1, "Expected no error getting real servers") + assert.Len(t, realServers1, 3, "Expected 3 real servers") + assert.Equal(t, realServers1[0].String(), "10.0.1.1:80") + assert.Equal(t, realServers1[1].String(), "10.0.1.2:80") + assert.Equal(t, realServers1[2].String(), "10.0.1.5:80") + + // externalIP should route to local ready + non-terminating endpoints if they exist + realServers2, rsErr2 := ipvs.GetRealServers(externalIPServer) + assert.Nil(t, rsErr2, "Expected no error getting real servers") + assert.Len(t, realServers2, 2, "Expected 2 real servers") + assert.Equal(t, realServers2[0].String(), "10.0.1.1:80") + assert.Equal(t, realServers2[1].String(), "10.0.1.2:80") + + fp.OnEndpointSliceDelete(endpointSlice) + fp.syncProxyRules() + + // Ensure that Proxier updates ipvs appropriately after EndpointSlice delete + assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"]) + activeEntries2 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries + assert.Equal(t, 0, activeEntries2.Len(), "Expected 0 active entries in KUBE-LOOP-BACK") + + virtualServers, vsErr = ipvs.GetVirtualServers() + assert.Nil(t, vsErr, "Expected no error getting virtual servers") + assert.Len(t, virtualServers, 2, "Expected 1 virtual server") + + for _, virtualServer := range virtualServers { + if virtualServer.Address.String() == "172.20.1.1" { + clusterIPServer = virtualServer + } + + if virtualServer.Address.String() == "1.2.3.4" { + externalIPServer = virtualServer + } + } + + realServers1, rsErr1 = ipvs.GetRealServers(clusterIPServer) + assert.Nil(t, rsErr1, "Expected no error getting real servers") + assert.Len(t, realServers1, 0, "Expected 0 real servers") + + realServers2, rsErr2 = ipvs.GetRealServers(externalIPServer) + assert.Nil(t, rsErr2, "Expected no error getting real servers") + assert.Len(t, realServers2, 0, "Expected 0 real servers") +} + +// Test_EndpointSliceOnlyReadyTerminatingLocal tests that when there are only local ready terminating +// endpoints, we fall back to those endpoints. +func Test_EndpointSliceOnlyReadyAndTerminatingLocal(t *testing.T) { + ipt := iptablestest.NewFake() + ipvs := ipvstest.NewFake() + ipset := ipsettest.NewFake(testIPSetVersion) + fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, true, v1.IPv4Protocol) + fp.servicesSynced = true + fp.endpointsSynced = true + fp.endpointSlicesSynced = true + + localInternalTrafficPolicy := v1.ServiceInternalTrafficPolicyLocal + + // Add initial service + serviceName := "svc1" + namespaceName := "ns1" + fp.OnServiceAdd(&v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName}, + Spec: v1.ServiceSpec{ + ClusterIP: "172.20.1.1", + Selector: map[string]string{"foo": "bar"}, + Type: v1.ServiceTypeNodePort, + ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeLocal, + InternalTrafficPolicy: &localInternalTrafficPolicy, + ExternalIPs: []string{ + "1.2.3.4", + }, + Ports: []v1.ServicePort{ + { + Name: "", + TargetPort: intstr.FromInt(80), + Protocol: v1.ProtocolTCP, + }, + }, + }, + }) + + // Add initial endpoint slice + tcpProtocol := v1.ProtocolTCP + endpointSlice := &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-1", serviceName), + Namespace: namespaceName, + Labels: map[string]string{discovery.LabelServiceName: serviceName}, + }, + Ports: []discovery.EndpointPort{{ + Name: utilpointer.StringPtr(""), + Port: utilpointer.Int32Ptr(80), + Protocol: &tcpProtocol, + }}, + AddressType: discovery.AddressTypeIPv4, + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{"10.0.1.1"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(true), + }, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, + { + Addresses: []string{"10.0.1.2"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(true), + }, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, + { + Addresses: []string{"10.0.1.3"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(false), + Terminating: utilpointer.BoolPtr(true), + }, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, + { + Addresses: []string{"10.0.1.4"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(true), + }, + Topology: map[string]string{"kubernetes.io/hostname": "another-host"}, + }, + { + Addresses: []string{"10.0.1.5"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(true), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(false), + }, + Topology: map[string]string{"kubernetes.io/hostname": "another-host"}, + }, + }, + } + + fp.OnEndpointSliceAdd(endpointSlice) + fp.syncProxyRules() + + // Ensure that Proxier updates ipvs appropriately after EndpointSlice update + assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"]) + activeEntries1 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries + assert.Equal(t, 3, activeEntries1.Len(), "Expected 3 active entry in KUBE-LOOP-BACK") + assert.Equal(t, true, activeEntries1.Has("10.0.1.1,tcp:80,10.0.1.1"), "Expected activeEntries to reference first (local) pod") + assert.Equal(t, true, activeEntries1.Has("10.0.1.2,tcp:80,10.0.1.2"), "Expected activeEntries to reference second (local) pod") + assert.Equal(t, true, activeEntries1.Has("10.0.1.3,tcp:80,10.0.1.3"), "Expected activeEntries to reference second (local) pod") + + virtualServers, vsErr := ipvs.GetVirtualServers() + assert.Nil(t, vsErr, "Expected no error getting virtual servers") + assert.Len(t, virtualServers, 2, "Expected 2 virtual server") + + var clusterIPServer, externalIPServer *utilipvs.VirtualServer + for _, virtualServer := range virtualServers { + if virtualServer.Address.String() == "172.20.1.1" { + clusterIPServer = virtualServer + } + + if virtualServer.Address.String() == "1.2.3.4" { + externalIPServer = virtualServer + } + } + + // clusterIP should route to cluster-wide Ready endpoints + realServers1, rsErr1 := ipvs.GetRealServers(clusterIPServer) + assert.Nil(t, rsErr1, "Expected no error getting real servers") + assert.Len(t, realServers1, 1, "Expected 1 real servers") + assert.Equal(t, realServers1[0].String(), "10.0.1.5:80") + + // externalIP should fall back to local ready + terminating endpoints + realServers2, rsErr2 := ipvs.GetRealServers(externalIPServer) + assert.Nil(t, rsErr2, "Expected no error getting real servers") + assert.Len(t, realServers2, 2, "Expected 2 real servers") + assert.Equal(t, realServers2[0].String(), "10.0.1.1:80") + assert.Equal(t, realServers2[1].String(), "10.0.1.2:80") + + fp.OnEndpointSliceDelete(endpointSlice) + fp.syncProxyRules() + + // Ensure that Proxier updates ipvs appropriately after EndpointSlice delete + assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"]) + activeEntries2 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries + assert.Equal(t, 0, activeEntries2.Len(), "Expected 0 active entries in KUBE-LOOP-BACK") + + virtualServers, vsErr = ipvs.GetVirtualServers() + assert.Nil(t, vsErr, "Expected no error getting virtual servers") + assert.Len(t, virtualServers, 2, "Expected 1 virtual server") + + for _, virtualServer := range virtualServers { + if virtualServer.Address.String() == "172.20.1.1" { + clusterIPServer = virtualServer + } + + if virtualServer.Address.String() == "1.2.3.4" { + externalIPServer = virtualServer + } + } + + realServers1, rsErr1 = ipvs.GetRealServers(clusterIPServer) + assert.Nil(t, rsErr1, "Expected no error getting real servers") + assert.Len(t, realServers1, 0, "Expected 0 real servers") + + realServers2, rsErr2 = ipvs.GetRealServers(externalIPServer) + assert.Nil(t, rsErr2, "Expected no error getting real servers") + assert.Len(t, realServers2, 0, "Expected 0 real servers") +} From 4c8b190372aaf6eb6ccce07d1763d2d4e26eb98d Mon Sep 17 00:00:00 2001 From: Andrew Sy Kim Date: Thu, 29 Apr 2021 16:05:49 -0400 Subject: [PATCH 06/17] proxier/iptables: reuse the same variable for endpointchains for better memory consumption Signed-off-by: Andrew Sy Kim --- pkg/proxy/iptables/proxier.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index d745689d9e9..c8b49c1abe3 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -993,6 +993,11 @@ func (proxier *Proxier) syncProxyRules() { // slice = append(slice[:0], ...) endpoints := make([]*endpointsInfo, 0) endpointChains := make([]utiliptables.Chain, 0) + readyEndpoints := make([]*endpointsInfo, 0) + readyEndpointChains := make([]utiliptables.Chain, 0) + localReadyEndpointChains := make([]utiliptables.Chain, 0) + localServingTerminatingEndpointChains := make([]utiliptables.Chain, 0) + // To avoid growing this slice, we arbitrarily set its size to 64, // there is never more than that many arguments for a single line. // Note that even if we go over 64, it will still be correct - it @@ -1396,10 +1401,10 @@ func (proxier *Proxier) syncProxyRules() { // 1. all endpoints that are ready and NOT terminating. // 2. all endpoints that are local, ready and NOT terminating, and externalTrafficPolicy=Local // 3. all endpoints that are local, serving and terminating, and externalTrafficPolicy=Local - readyEndpointChains := make([]utiliptables.Chain, 0, len(endpointChains)) - readyEndpoints := make([]*endpointsInfo, 0, len(allEndpoints)) - localReadyEndpointChains := make([]utiliptables.Chain, 0, len(endpointChains)) - localServingTerminatingEndpointChains := make([]utiliptables.Chain, 0, len(endpointChains)) + readyEndpointChains = readyEndpointChains[:0] + readyEndpoints := readyEndpoints[:0] + localReadyEndpointChains := localReadyEndpointChains[:0] + localServingTerminatingEndpointChains := localServingTerminatingEndpointChains[:0] for i, endpointChain := range endpointChains { if endpoints[i].Ready { readyEndpointChains = append(readyEndpointChains, endpointChain) From d82d851d89a3dfb9762d65359ee194211093e225 Mon Sep 17 00:00:00 2001 From: Andrew Sy Kim Date: Thu, 29 Apr 2021 16:06:01 -0400 Subject: [PATCH 07/17] proxier/iptables: include Service port in unit tests Signed-off-by: Andrew Sy Kim --- pkg/proxy/iptables/proxier_test.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 407693a8095..3629cbd8c74 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -3611,8 +3611,8 @@ COMMIT -A KUBE-POSTROUTING -j MARK --xor-mark 0x4000 -A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE -A KUBE-MARK-MASQ -j MARK --or-mark 0x4000 --A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 0 ! -s 10.0.0.0/24 -j KUBE-MARK-MASQ --A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 0 -j KUBE-SVC-AQI2S6QIMU7PVVRP +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 80 ! -s 10.0.0.0/24 -j KUBE-MARK-MASQ +-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 80 -j KUBE-SVC-AQI2S6QIMU7PVVRP -A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.3333333333 -j KUBE-SEP-3JOIVZTXZZRGORX4 -A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-IO5XOSKPAXIFQXAJ -A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-EQCHZ7S2PJ72OHAY @@ -3655,6 +3655,7 @@ COMMIT { Name: "", TargetPort: intstr.FromInt(80), + Port: 80, Protocol: v1.ProtocolTCP, }, }, @@ -3764,8 +3765,8 @@ COMMIT -A KUBE-POSTROUTING -j MARK --xor-mark 0x4000 -A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE -A KUBE-MARK-MASQ -j MARK --or-mark 0x4000 --A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 0 ! -s 10.0.0.0/24 -j KUBE-MARK-MASQ --A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 0 -j KUBE-SVC-AQI2S6QIMU7PVVRP +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 80 ! -s 10.0.0.0/24 -j KUBE-MARK-MASQ +-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 80 -j KUBE-SVC-AQI2S6QIMU7PVVRP -A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-VLJB2F747S6W7EX4 -A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -s 10.0.1.1/32 -j KUBE-MARK-MASQ -A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80 @@ -3800,7 +3801,7 @@ COMMIT Type: v1.ServiceTypeNodePort, ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeLocal, Selector: map[string]string{"foo": "bar"}, - Ports: []v1.ServicePort{{Name: "", TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP}}, + Ports: []v1.ServicePort{{Name: "", Port: 80, TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP}}, }, }) From cf9ccf5a8e258df60a767153b37586aea6861854 Mon Sep 17 00:00:00 2001 From: Andrew Sy Kim Date: Thu, 29 Apr 2021 16:17:35 -0400 Subject: [PATCH 08/17] proxier/ipvs: unit tests should specify Service ports Signed-off-by: Andrew Sy Kim --- pkg/proxy/ipvs/proxier_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 52eda4440d1..eac1051d53e 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -4684,6 +4684,7 @@ func Test_EndpointSliceReadyAndTerminatingLocal(t *testing.T) { Ports: []v1.ServicePort{ { Name: "", + Port: 80, TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP, }, @@ -4857,6 +4858,7 @@ func Test_EndpointSliceOnlyReadyAndTerminatingLocal(t *testing.T) { Ports: []v1.ServicePort{ { Name: "", + Port: 80, TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP, }, From 25e2c92733c4a5c86974570111ec0f6c20320022 Mon Sep 17 00:00:00 2001 From: Andrew Sy Kim Date: Thu, 29 Apr 2021 16:53:02 -0400 Subject: [PATCH 09/17] add feature gate ProxyTerminatingEndpoints Signed-off-by: Andrew Sy Kim --- pkg/features/kube_features.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 167cf72b80a..be7eb7edfc6 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -595,6 +595,13 @@ const ( // Enable Terminating condition in Endpoint Slices. EndpointSliceTerminatingCondition featuregate.Feature = "EndpointSliceTerminatingCondition" + // owner: @andrewsykim + // kep: http://kep.k8s.io/1669 + // alpha: v1.22 + // + // Enable kube-proxy to handle terminating ednpoints when externalTrafficPolicy=Local + ProxyTerminatingEndpoints featuregate.Feature = "ProxyTerminatingEndpoints" + // owner: @robscott // kep: http://kep.k8s.io/752 // alpha: v1.20 @@ -828,6 +835,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS EndpointSlice: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.25 EndpointSliceProxying: {Default: true, PreRelease: featuregate.Beta}, EndpointSliceTerminatingCondition: {Default: false, PreRelease: featuregate.Alpha}, + ProxyTerminatingEndpoints: {Default: false, PreRelease: featuregate.Alpha}, EndpointSliceNodeName: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, //remove in 1.25 WindowsEndpointSliceProxying: {Default: true, PreRelease: featuregate.Beta}, StartupProbe: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.23 From 8c514cb2329c2e35f4d344ab4328d27c29f245e8 Mon Sep 17 00:00:00 2001 From: Andrew Sy Kim Date: Thu, 29 Apr 2021 16:53:27 -0400 Subject: [PATCH 10/17] proxier/iptables: check feature gate ProxyTerminatingEndpoints Signed-off-by: Andrew Sy Kim --- pkg/proxy/iptables/proxier.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index c8b49c1abe3..6a1ca47b21f 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -1499,7 +1499,7 @@ func (proxier *Proxier) syncProxyRules() { // Prefer local ready endpoint chains, but fall back to ready terminating if none exist localEndpointChains := localReadyEndpointChains - if len(localEndpointChains) == 0 { + if utilfeature.DefaultFeatureGate.Enabled(features.ProxyTerminatingEndpoints) && len(localEndpointChains) == 0 { localEndpointChains = localServingTerminatingEndpointChains } From f92265f6543e86e55cf6743d51e42be6186f483a Mon Sep 17 00:00:00 2001 From: Andrew Sy Kim Date: Thu, 29 Apr 2021 16:54:05 -0400 Subject: [PATCH 11/17] proxier/ipvs: check feature gate ProxyTerminatingEndpoints Signed-off-by: Andrew Sy Kim --- pkg/proxy/ipvs/proxier_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index eac1051d53e..608473a7d61 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -4657,6 +4657,8 @@ func TestTestInternalTrafficPolicyE2E(t *testing.T) { // Test_EndpointSliceReadyAndTerminatingLocal tests that when there are local ready and ready + terminating // endpoints, only the ready endpoints are used. func Test_EndpointSliceReadyAndTerminatingLocal(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ProxyTerminatingEndpoints, true)() + ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) @@ -4831,6 +4833,8 @@ func Test_EndpointSliceReadyAndTerminatingLocal(t *testing.T) { // Test_EndpointSliceOnlyReadyTerminatingLocal tests that when there are only local ready terminating // endpoints, we fall back to those endpoints. func Test_EndpointSliceOnlyReadyAndTerminatingLocal(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ProxyTerminatingEndpoints, true)() + ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) From 68ebd16a2ca1383015243fc7971a320391791ace Mon Sep 17 00:00:00 2001 From: Andrew Sy Kim Date: Thu, 29 Apr 2021 16:54:26 -0400 Subject: [PATCH 12/17] proxier/iptables: refactor terminating endpoints unit tests with test table and test for feature gate Signed-off-by: Andrew Sy Kim --- pkg/proxy/iptables/proxier_test.go | 623 ++++++++++++++++++++--------- 1 file changed, 443 insertions(+), 180 deletions(-) diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 3629cbd8c74..c7f5010572a 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -3582,10 +3582,101 @@ COMMIT } } -// Test_EndpointSliceReadyAndTerminatingLocal tests that when there are local ready and ready + terminating +// Test_EndpointSliceWithTerminatingEndpoints tests that when there are local ready and ready + terminating // endpoints, only the ready endpoints are used. -func Test_EndpointSliceReadyAndTerminatingLocal(t *testing.T) { - expectedIPTablesWithSlice := `*filter +func Test_EndpointSliceWithTerminatingEndpoints(t *testing.T) { + tcpProtocol := v1.ProtocolTCP + testcases := []struct { + name string + terminatingFeatureGate bool + service *v1.Service + endpointslice *discovery.EndpointSlice + expectedIPTables string + }{ + { + name: "feature gate ProxyTerminatingEndpoints enabled, ready endpoints exist", + terminatingFeatureGate: true, + service: &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "svc1", Namespace: "ns1"}, + Spec: v1.ServiceSpec{ + ClusterIP: "172.20.1.1", + Type: v1.ServiceTypeNodePort, + ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeLocal, + Selector: map[string]string{"foo": "bar"}, + Ports: []v1.ServicePort{ + { + Name: "", + TargetPort: intstr.FromInt(80), + Port: 80, + Protocol: v1.ProtocolTCP, + }, + }, + }, + }, + endpointslice: &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-1", "svc1"), + Namespace: "ns1", + Labels: map[string]string{discovery.LabelServiceName: "svc1"}, + }, + Ports: []discovery.EndpointPort{{ + Name: utilpointer.StringPtr(""), + Port: utilpointer.Int32Ptr(80), + Protocol: &tcpProtocol, + }}, + AddressType: discovery.AddressTypeIPv4, + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{"10.0.1.1"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(true), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(false), + }, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, + { + Addresses: []string{"10.0.1.2"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(true), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(false), + }, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, + { + // this endpoint should be ignored for node ports since there are ready non-terminating endpoints + Addresses: []string{"10.0.1.3"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(true), + }, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, + { + // this endpoint should be ignored for node ports since there are ready non-terminating endpoints + Addresses: []string{"10.0.1.4"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(false), + Terminating: utilpointer.BoolPtr(true), + }, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, + { + // this endpoint should be ignored for node ports since it's not local + Addresses: []string{"10.0.1.5"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(true), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(false), + }, + Topology: map[string]string{"kubernetes.io/hostname": "host-1"}, + }, + }, + }, + expectedIPTables: `*filter :KUBE-SERVICES - [0:0] :KUBE-EXTERNAL-SERVICES - [0:0] :KUBE-FORWARD - [0:0] @@ -3633,114 +3724,92 @@ COMMIT -A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Balancing rule 1 for ns1/svc1" -j KUBE-SEP-IO5XOSKPAXIFQXAJ -A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS COMMIT -` - - ipt := iptablestest.NewFake() - fp := NewFakeProxier(ipt, true) - fp.OnServiceSynced() - fp.OnEndpointsSynced() - fp.OnEndpointSlicesSynced() - - serviceName := "svc1" - namespaceName := "ns1" - - fp.OnServiceAdd(&v1.Service{ - ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName}, - Spec: v1.ServiceSpec{ - ClusterIP: "172.20.1.1", - Type: v1.ServiceTypeNodePort, - ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeLocal, - Selector: map[string]string{"foo": "bar"}, - Ports: []v1.ServicePort{ - { - Name: "", - TargetPort: intstr.FromInt(80), - Port: 80, - Protocol: v1.ProtocolTCP, - }, - }, +`, }, - }) - - tcpProtocol := v1.ProtocolTCP - endpointSlice := &discovery.EndpointSlice{ - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-1", serviceName), - Namespace: namespaceName, - Labels: map[string]string{discovery.LabelServiceName: serviceName}, - }, - Ports: []discovery.EndpointPort{{ - Name: utilpointer.StringPtr(""), - Port: utilpointer.Int32Ptr(80), - Protocol: &tcpProtocol, - }}, - AddressType: discovery.AddressTypeIPv4, - Endpoints: []discovery.Endpoint{ - { - Addresses: []string{"10.0.1.1"}, - Conditions: discovery.EndpointConditions{ - Ready: utilpointer.BoolPtr(true), - Serving: utilpointer.BoolPtr(true), - Terminating: utilpointer.BoolPtr(false), + { + name: "feature gate ProxyTerminatingEndpoints disabled, ready endpoints exist", + terminatingFeatureGate: false, + service: &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "svc1", Namespace: "ns1"}, + Spec: v1.ServiceSpec{ + ClusterIP: "172.20.1.1", + Type: v1.ServiceTypeNodePort, + ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeLocal, + Selector: map[string]string{"foo": "bar"}, + Ports: []v1.ServicePort{ + { + Name: "", + TargetPort: intstr.FromInt(80), + Port: 80, + Protocol: v1.ProtocolTCP, + }, + }, }, - Topology: map[string]string{"kubernetes.io/hostname": testHostname}, }, - { - Addresses: []string{"10.0.1.2"}, - Conditions: discovery.EndpointConditions{ - Ready: utilpointer.BoolPtr(true), - Serving: utilpointer.BoolPtr(true), - Terminating: utilpointer.BoolPtr(false), + endpointslice: &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-1", "svc1"), + Namespace: "ns1", + Labels: map[string]string{discovery.LabelServiceName: "svc1"}, }, - Topology: map[string]string{"kubernetes.io/hostname": testHostname}, - }, - { - // this endpoint should be ignored for node ports since there are ready non-terminating endpoints - Addresses: []string{"10.0.1.3"}, - Conditions: discovery.EndpointConditions{ - Ready: utilpointer.BoolPtr(false), - Serving: utilpointer.BoolPtr(true), - Terminating: utilpointer.BoolPtr(true), + Ports: []discovery.EndpointPort{{ + Name: utilpointer.StringPtr(""), + Port: utilpointer.Int32Ptr(80), + Protocol: &tcpProtocol, + }}, + AddressType: discovery.AddressTypeIPv4, + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{"10.0.1.1"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(true), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(false), + }, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, + { + Addresses: []string{"10.0.1.2"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(true), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(false), + }, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, + { + // this endpoint should be ignored for node ports since there are ready non-terminating endpoints + Addresses: []string{"10.0.1.3"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(true), + }, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, + { + // this endpoint should be ignored for node ports since there are ready non-terminating endpoints + Addresses: []string{"10.0.1.4"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(false), + Terminating: utilpointer.BoolPtr(true), + }, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, + { + // this endpoint should be ignored for node ports since it's not local + Addresses: []string{"10.0.1.5"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(true), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(false), + }, + Topology: map[string]string{"kubernetes.io/hostname": "host-1"}, + }, }, - Topology: map[string]string{"kubernetes.io/hostname": testHostname}, }, - { - // this endpoint should be ignored for node ports since there are ready non-terminating endpoints - Addresses: []string{"10.0.1.4"}, - Conditions: discovery.EndpointConditions{ - Ready: utilpointer.BoolPtr(false), - Serving: utilpointer.BoolPtr(false), - Terminating: utilpointer.BoolPtr(true), - }, - Topology: map[string]string{"kubernetes.io/hostname": testHostname}, - }, - { - // this endpoint should be ignored for node ports since it's not local - Addresses: []string{"10.0.1.5"}, - Conditions: discovery.EndpointConditions{ - Ready: utilpointer.BoolPtr(true), - Serving: utilpointer.BoolPtr(true), - Terminating: utilpointer.BoolPtr(false), - }, - Topology: map[string]string{"kubernetes.io/hostname": "host-1"}, - }, - }, - } - - fp.OnEndpointSliceAdd(endpointSlice) - fp.syncProxyRules() - t.Log(fp.iptablesData.String()) - assert.Equal(t, expectedIPTablesWithSlice, fp.iptablesData.String()) - - fp.OnEndpointSliceDelete(endpointSlice) - fp.syncProxyRules() - assert.NotEqual(t, expectedIPTablesWithSlice, fp.iptablesData.String()) -} - -// Test_EndpointSliceOnlyReadyTerminatingLocal tests that when there are only local ready terminating -// endpoints, we fall back to those endpoints. -func Test_EndpointSliceOnlyReadyTerminatingLocal(t *testing.T) { - expectedIPTablesWithSlice := `*filter + expectedIPTables: `*filter :KUBE-SERVICES - [0:0] :KUBE-EXTERNAL-SERVICES - [0:0] :KUBE-FORWARD - [0:0] @@ -3761,13 +3830,16 @@ COMMIT :KUBE-SEP-IO5XOSKPAXIFQXAJ - [0:0] :KUBE-SEP-XGJFVO3L2O5SRFNT - [0:0] :KUBE-SEP-VLJB2F747S6W7EX4 - [0:0] +:KUBE-SEP-EQCHZ7S2PJ72OHAY - [0:0] -A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN -A KUBE-POSTROUTING -j MARK --xor-mark 0x4000 -A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE -A KUBE-MARK-MASQ -j MARK --or-mark 0x4000 -A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 80 ! -s 10.0.0.0/24 -j KUBE-MARK-MASQ -A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 80 -j KUBE-SVC-AQI2S6QIMU7PVVRP --A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-VLJB2F747S6W7EX4 +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.3333333333 -j KUBE-SEP-3JOIVZTXZZRGORX4 +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-IO5XOSKPAXIFQXAJ +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-EQCHZ7S2PJ72OHAY -A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -s 10.0.1.1/32 -j KUBE-MARK-MASQ -A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80 -A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -s 10.0.1.2/32 -j KUBE-MARK-MASQ @@ -3776,6 +3848,8 @@ COMMIT -A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.3:80 -A KUBE-SEP-VLJB2F747S6W7EX4 -m comment --comment ns1/svc1 -s 10.0.1.4/32 -j KUBE-MARK-MASQ -A KUBE-SEP-VLJB2F747S6W7EX4 -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.4:80 +-A KUBE-SEP-EQCHZ7S2PJ72OHAY -m comment --comment ns1/svc1 -s 10.0.1.5/32 -j KUBE-MARK-MASQ +-A KUBE-SEP-EQCHZ7S2PJ72OHAY -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.5:80 -A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Redirect pods trying to reach external loadbalancer VIP to clusterIP" -s 10.0.0.0/24 -j KUBE-SVC-AQI2S6QIMU7PVVRP -A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "masquerade LOCAL traffic for ns1/svc1 LB IP" -m addrtype --src-type LOCAL -j KUBE-MARK-MASQ -A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "route LOCAL traffic for ns1/svc1 LB IP to service chain" -m addrtype --src-type LOCAL -j KUBE-SVC-AQI2S6QIMU7PVVRP @@ -3783,89 +3857,278 @@ COMMIT -A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Balancing rule 1 for ns1/svc1" -j KUBE-SEP-IO5XOSKPAXIFQXAJ -A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS COMMIT -` - - ipt := iptablestest.NewFake() - fp := NewFakeProxier(ipt, true) - fp.OnServiceSynced() - fp.OnEndpointsSynced() - fp.OnEndpointSlicesSynced() - - serviceName := "svc1" - namespaceName := "ns1" - - fp.OnServiceAdd(&v1.Service{ - ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName}, - Spec: v1.ServiceSpec{ - ClusterIP: "172.20.1.1", - Type: v1.ServiceTypeNodePort, - ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeLocal, - Selector: map[string]string{"foo": "bar"}, - Ports: []v1.ServicePort{{Name: "", Port: 80, TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP}}, +`, }, - }) - - tcpProtocol := v1.ProtocolTCP - endpointSlice := &discovery.EndpointSlice{ - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-1", serviceName), - Namespace: namespaceName, - Labels: map[string]string{discovery.LabelServiceName: serviceName}, + { + name: "feature gate ProxyTerminatingEndpoints enabled, only terminating endpoints exist", + terminatingFeatureGate: true, + service: &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "svc1", Namespace: "ns1"}, + Spec: v1.ServiceSpec{ + ClusterIP: "172.20.1.1", + Type: v1.ServiceTypeNodePort, + ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeLocal, + Selector: map[string]string{"foo": "bar"}, + Ports: []v1.ServicePort{ + { + Name: "", + TargetPort: intstr.FromInt(80), + Port: 80, + Protocol: v1.ProtocolTCP, + }, + }, + }, + }, + endpointslice: &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-1", "svc1"), + Namespace: "ns1", + Labels: map[string]string{discovery.LabelServiceName: "svc1"}, + }, + Ports: []discovery.EndpointPort{{ + Name: utilpointer.StringPtr(""), + Port: utilpointer.Int32Ptr(80), + Protocol: &tcpProtocol, + }}, + AddressType: discovery.AddressTypeIPv4, + Endpoints: []discovery.Endpoint{ + { + // this endpoint should be used since there are only ready terminating endpoints + Addresses: []string{"10.0.1.2"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(true), + }, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, + { + // this endpoint should be used since there are only ready terminating endpoints + Addresses: []string{"10.0.1.3"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(true), + }, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, + { + // this endpoint should not be used since it is both terminating and not ready. + Addresses: []string{"10.0.1.4"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(false), + Terminating: utilpointer.BoolPtr(true), + }, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, + { + // this endpoint should be ignored for node ports since it's not local + Addresses: []string{"10.0.1.5"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(true), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(false), + }, + Topology: map[string]string{"kubernetes.io/hostname": "host-1"}, + }, + }, + }, + expectedIPTables: `*filter +:KUBE-SERVICES - [0:0] +:KUBE-EXTERNAL-SERVICES - [0:0] +:KUBE-FORWARD - [0:0] +:KUBE-NODEPORTS - [0:0] +-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP +-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT +-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod source rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT +-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod destination rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT +COMMIT +*nat +:KUBE-SERVICES - [0:0] +:KUBE-NODEPORTS - [0:0] +:KUBE-POSTROUTING - [0:0] +:KUBE-MARK-MASQ - [0:0] +:KUBE-SVC-AQI2S6QIMU7PVVRP - [0:0] +:KUBE-XLB-AQI2S6QIMU7PVVRP - [0:0] +:KUBE-SEP-IO5XOSKPAXIFQXAJ - [0:0] +:KUBE-SEP-XGJFVO3L2O5SRFNT - [0:0] +:KUBE-SEP-VLJB2F747S6W7EX4 - [0:0] +:KUBE-SEP-EQCHZ7S2PJ72OHAY - [0:0] +-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN +-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000 +-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE +-A KUBE-MARK-MASQ -j MARK --or-mark 0x4000 +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 80 ! -s 10.0.0.0/24 -j KUBE-MARK-MASQ +-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 80 -j KUBE-SVC-AQI2S6QIMU7PVVRP +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-EQCHZ7S2PJ72OHAY +-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -s 10.0.1.2/32 -j KUBE-MARK-MASQ +-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.2:80 +-A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -s 10.0.1.3/32 -j KUBE-MARK-MASQ +-A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.3:80 +-A KUBE-SEP-VLJB2F747S6W7EX4 -m comment --comment ns1/svc1 -s 10.0.1.4/32 -j KUBE-MARK-MASQ +-A KUBE-SEP-VLJB2F747S6W7EX4 -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.4:80 +-A KUBE-SEP-EQCHZ7S2PJ72OHAY -m comment --comment ns1/svc1 -s 10.0.1.5/32 -j KUBE-MARK-MASQ +-A KUBE-SEP-EQCHZ7S2PJ72OHAY -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.5:80 +-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Redirect pods trying to reach external loadbalancer VIP to clusterIP" -s 10.0.0.0/24 -j KUBE-SVC-AQI2S6QIMU7PVVRP +-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "masquerade LOCAL traffic for ns1/svc1 LB IP" -m addrtype --src-type LOCAL -j KUBE-MARK-MASQ +-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "route LOCAL traffic for ns1/svc1 LB IP to service chain" -m addrtype --src-type LOCAL -j KUBE-SVC-AQI2S6QIMU7PVVRP +-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Balancing rule 0 for ns1/svc1" -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-IO5XOSKPAXIFQXAJ +-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Balancing rule 1 for ns1/svc1" -j KUBE-SEP-XGJFVO3L2O5SRFNT +-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS +COMMIT +`, }, - Ports: []discovery.EndpointPort{{ - Name: utilpointer.StringPtr(""), - Port: utilpointer.Int32Ptr(80), - Protocol: &tcpProtocol, - }}, - AddressType: discovery.AddressTypeIPv4, - Endpoints: []discovery.Endpoint{ - { - Addresses: []string{"10.0.1.1"}, - Conditions: discovery.EndpointConditions{ - Ready: utilpointer.BoolPtr(false), - Serving: utilpointer.BoolPtr(true), - Terminating: utilpointer.BoolPtr(true), + { + name: "with ProxyTerminatingEndpoints disabled, only terminating endpoints exist", + terminatingFeatureGate: false, + service: &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "svc1", Namespace: "ns1"}, + Spec: v1.ServiceSpec{ + ClusterIP: "172.20.1.1", + Type: v1.ServiceTypeNodePort, + ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeLocal, + Selector: map[string]string{"foo": "bar"}, + Ports: []v1.ServicePort{ + { + Name: "", + TargetPort: intstr.FromInt(80), + Port: 80, + Protocol: v1.ProtocolTCP, + }, + }, }, - Topology: map[string]string{"kubernetes.io/hostname": testHostname}, }, - { - Addresses: []string{"10.0.1.2"}, - Conditions: discovery.EndpointConditions{ - Ready: utilpointer.BoolPtr(false), - Serving: utilpointer.BoolPtr(true), - Terminating: utilpointer.BoolPtr(true), + endpointslice: &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-1", "svc1"), + Namespace: "ns1", + Labels: map[string]string{discovery.LabelServiceName: "svc1"}, }, - Topology: map[string]string{"kubernetes.io/hostname": testHostname}, - }, - { - // this endpoint should be ignored since it is a not ready terminating endpoint - Addresses: []string{"10.0.1.3"}, - Conditions: discovery.EndpointConditions{ - Ready: utilpointer.BoolPtr(false), - Serving: utilpointer.BoolPtr(false), - Terminating: utilpointer.BoolPtr(true), + Ports: []discovery.EndpointPort{{ + Name: utilpointer.StringPtr(""), + Port: utilpointer.Int32Ptr(80), + Protocol: &tcpProtocol, + }}, + AddressType: discovery.AddressTypeIPv4, + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{"10.0.1.1"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(true), + }, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, + { + Addresses: []string{"10.0.1.2"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(true), + }, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, + { + Addresses: []string{"10.0.1.3"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(false), + Terminating: utilpointer.BoolPtr(true), + }, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, + { + Addresses: []string{"10.0.1.4"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(false), + Terminating: utilpointer.BoolPtr(true), + }, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, + { + Addresses: []string{"10.0.1.5"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(true), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(false), + }, + Topology: map[string]string{"kubernetes.io/hostname": "host-1"}, + }, }, - Topology: map[string]string{"kubernetes.io/hostname": testHostname}, - }, - { - // this endpoint should be ignored since it is on another host. - Addresses: []string{"10.0.1.4"}, - Conditions: discovery.EndpointConditions{ - Ready: utilpointer.BoolPtr(true), - Serving: utilpointer.BoolPtr(true), - Terminating: utilpointer.BoolPtr(false), - }, - Topology: map[string]string{"kubernetes.io/hostname": "another-host"}, }, + expectedIPTables: `*filter +:KUBE-SERVICES - [0:0] +:KUBE-EXTERNAL-SERVICES - [0:0] +:KUBE-FORWARD - [0:0] +:KUBE-NODEPORTS - [0:0] +-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP +-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT +-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod source rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT +-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod destination rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT +COMMIT +*nat +:KUBE-SERVICES - [0:0] +:KUBE-NODEPORTS - [0:0] +:KUBE-POSTROUTING - [0:0] +:KUBE-MARK-MASQ - [0:0] +:KUBE-SVC-AQI2S6QIMU7PVVRP - [0:0] +:KUBE-XLB-AQI2S6QIMU7PVVRP - [0:0] +:KUBE-SEP-3JOIVZTXZZRGORX4 - [0:0] +:KUBE-SEP-IO5XOSKPAXIFQXAJ - [0:0] +:KUBE-SEP-XGJFVO3L2O5SRFNT - [0:0] +:KUBE-SEP-VLJB2F747S6W7EX4 - [0:0] +:KUBE-SEP-EQCHZ7S2PJ72OHAY - [0:0] +-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN +-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000 +-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE +-A KUBE-MARK-MASQ -j MARK --or-mark 0x4000 +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 80 ! -s 10.0.0.0/24 -j KUBE-MARK-MASQ +-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 80 -j KUBE-SVC-AQI2S6QIMU7PVVRP +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-EQCHZ7S2PJ72OHAY +-A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -s 10.0.1.1/32 -j KUBE-MARK-MASQ +-A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80 +-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -s 10.0.1.2/32 -j KUBE-MARK-MASQ +-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.2:80 +-A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -s 10.0.1.3/32 -j KUBE-MARK-MASQ +-A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.3:80 +-A KUBE-SEP-VLJB2F747S6W7EX4 -m comment --comment ns1/svc1 -s 10.0.1.4/32 -j KUBE-MARK-MASQ +-A KUBE-SEP-VLJB2F747S6W7EX4 -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.4:80 +-A KUBE-SEP-EQCHZ7S2PJ72OHAY -m comment --comment ns1/svc1 -s 10.0.1.5/32 -j KUBE-MARK-MASQ +-A KUBE-SEP-EQCHZ7S2PJ72OHAY -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.5:80 +-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Redirect pods trying to reach external loadbalancer VIP to clusterIP" -s 10.0.0.0/24 -j KUBE-SVC-AQI2S6QIMU7PVVRP +-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "masquerade LOCAL traffic for ns1/svc1 LB IP" -m addrtype --src-type LOCAL -j KUBE-MARK-MASQ +-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "route LOCAL traffic for ns1/svc1 LB IP to service chain" -m addrtype --src-type LOCAL -j KUBE-SVC-AQI2S6QIMU7PVVRP +-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 has no local endpoints" -j KUBE-MARK-DROP +-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS +COMMIT +`, }, } - fp.OnEndpointSliceAdd(endpointSlice) - fp.syncProxyRules() - t.Log(fp.iptablesData.String()) - assert.Equal(t, expectedIPTablesWithSlice, fp.iptablesData.String()) + for _, testcase := range testcases { + t.Run(testcase.name, func(t *testing.T) { - fp.OnEndpointSliceDelete(endpointSlice) - fp.syncProxyRules() - assert.NotEqual(t, expectedIPTablesWithSlice, fp.iptablesData.String()) + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ProxyTerminatingEndpoints, testcase.terminatingFeatureGate)() + + ipt := iptablestest.NewFake() + fp := NewFakeProxier(ipt, true) + fp.OnServiceSynced() + fp.OnEndpointsSynced() + fp.OnEndpointSlicesSynced() + + fp.OnServiceAdd(testcase.service) + + fp.OnEndpointSliceAdd(testcase.endpointslice) + fp.syncProxyRules() + t.Log(fp.iptablesData.String()) + assert.Equal(t, testcase.expectedIPTables, fp.iptablesData.String()) + + fp.OnEndpointSliceDelete(testcase.endpointslice) + fp.syncProxyRules() + assert.NotEqual(t, testcase.expectedIPTables, fp.iptablesData.String()) + }) + } } From 3e459997c831a13d9c22f18debd9eb58c6082349 Mon Sep 17 00:00:00 2001 From: Andrew Sy Kim Date: Mon, 3 May 2021 11:05:16 -0400 Subject: [PATCH 13/17] proxy/ipvs: add a unit tests for when the ProxyTerminatingEndpoint feature gate is disabled Signed-off-by: Andrew Sy Kim --- pkg/proxy/ipvs/proxier_test.go | 176 ++++++++++++++++++++++++++++++++- 1 file changed, 173 insertions(+), 3 deletions(-) diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 608473a7d61..11d9bfaca9f 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -4795,9 +4795,8 @@ func Test_EndpointSliceReadyAndTerminatingLocal(t *testing.T) { // externalIP should route to local ready + non-terminating endpoints if they exist realServers2, rsErr2 := ipvs.GetRealServers(externalIPServer) assert.Nil(t, rsErr2, "Expected no error getting real servers") - assert.Len(t, realServers2, 2, "Expected 2 real servers") - assert.Equal(t, realServers2[0].String(), "10.0.1.1:80") - assert.Equal(t, realServers2[1].String(), "10.0.1.2:80") + assert.Len(t, realServers2, 1, "Expected 1 real servers") + assert.Equal(t, realServers2[0].String(), "10.0.1.3:80") fp.OnEndpointSliceDelete(endpointSlice) fp.syncProxyRules() @@ -5002,3 +5001,174 @@ func Test_EndpointSliceOnlyReadyAndTerminatingLocal(t *testing.T) { assert.Nil(t, rsErr2, "Expected no error getting real servers") assert.Len(t, realServers2, 0, "Expected 0 real servers") } + +// Test_EndpointSliceOnlyReadyTerminatingLocalWithFeatureGateDisabled tests that when there are only local ready terminating +// endpoints, we fall back to those endpoints. +func Test_EndpointSliceOnlyReadyAndTerminatingLocalWithFeatureGateDisabled(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ProxyTerminatingEndpoints, false)() + + ipt := iptablestest.NewFake() + ipvs := ipvstest.NewFake() + ipset := ipsettest.NewFake(testIPSetVersion) + fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, true, v1.IPv4Protocol) + fp.servicesSynced = true + fp.endpointsSynced = true + fp.endpointSlicesSynced = true + + localInternalTrafficPolicy := v1.ServiceInternalTrafficPolicyLocal + + // Add initial service + serviceName := "svc1" + namespaceName := "ns1" + fp.OnServiceAdd(&v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName}, + Spec: v1.ServiceSpec{ + ClusterIP: "172.20.1.1", + Selector: map[string]string{"foo": "bar"}, + Type: v1.ServiceTypeNodePort, + ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeLocal, + InternalTrafficPolicy: &localInternalTrafficPolicy, + ExternalIPs: []string{ + "1.2.3.4", + }, + Ports: []v1.ServicePort{ + { + Name: "", + Port: 80, + TargetPort: intstr.FromInt(80), + Protocol: v1.ProtocolTCP, + }, + }, + }, + }) + + // Add initial endpoint slice + tcpProtocol := v1.ProtocolTCP + endpointSlice := &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-1", serviceName), + Namespace: namespaceName, + Labels: map[string]string{discovery.LabelServiceName: serviceName}, + }, + Ports: []discovery.EndpointPort{{ + Name: utilpointer.StringPtr(""), + Port: utilpointer.Int32Ptr(80), + Protocol: &tcpProtocol, + }}, + AddressType: discovery.AddressTypeIPv4, + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{"10.0.1.1"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(true), + }, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, + { + Addresses: []string{"10.0.1.2"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(true), + }, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, + { + Addresses: []string{"10.0.1.3"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(false), + Terminating: utilpointer.BoolPtr(true), + }, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, + { + Addresses: []string{"10.0.1.4"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(true), + }, + Topology: map[string]string{"kubernetes.io/hostname": "another-host"}, + }, + { + Addresses: []string{"10.0.1.5"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(true), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(false), + }, + Topology: map[string]string{"kubernetes.io/hostname": "another-host"}, + }, + }, + } + + fp.OnEndpointSliceAdd(endpointSlice) + fp.syncProxyRules() + + // Ensure that Proxier updates ipvs appropriately after EndpointSlice update + assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"]) + activeEntries1 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries + assert.Equal(t, 3, activeEntries1.Len(), "Expected 3 active entry in KUBE-LOOP-BACK") + assert.Equal(t, true, activeEntries1.Has("10.0.1.1,tcp:80,10.0.1.1"), "Expected activeEntries to reference first (local) pod") + assert.Equal(t, true, activeEntries1.Has("10.0.1.2,tcp:80,10.0.1.2"), "Expected activeEntries to reference second (local) pod") + assert.Equal(t, true, activeEntries1.Has("10.0.1.3,tcp:80,10.0.1.3"), "Expected activeEntries to reference second (local) pod") + + virtualServers, vsErr := ipvs.GetVirtualServers() + assert.Nil(t, vsErr, "Expected no error getting virtual servers") + assert.Len(t, virtualServers, 2, "Expected 2 virtual server") + + var clusterIPServer, externalIPServer *utilipvs.VirtualServer + for _, virtualServer := range virtualServers { + if virtualServer.Address.String() == "172.20.1.1" { + clusterIPServer = virtualServer + } + + if virtualServer.Address.String() == "1.2.3.4" { + externalIPServer = virtualServer + } + } + + // clusterIP should route to cluster-wide Ready endpoints + realServers1, rsErr1 := ipvs.GetRealServers(clusterIPServer) + assert.Nil(t, rsErr1, "Expected no error getting real servers") + assert.Len(t, realServers1, 1, "Expected 1 real servers") + assert.Equal(t, realServers1[0].String(), "10.0.1.5:80") + + // externalIP should have 0 endpoints since the feature gate is disabled. + realServers2, rsErr2 := ipvs.GetRealServers(externalIPServer) + assert.Nil(t, rsErr2, "Expected no error getting real servers") + assert.Len(t, realServers2, 0, "Expected 0 real servers") + + fp.OnEndpointSliceDelete(endpointSlice) + fp.syncProxyRules() + + // Ensure that Proxier updates ipvs appropriately after EndpointSlice delete + assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"]) + activeEntries2 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries + assert.Equal(t, 0, activeEntries2.Len(), "Expected 0 active entries in KUBE-LOOP-BACK") + + virtualServers, vsErr = ipvs.GetVirtualServers() + assert.Nil(t, vsErr, "Expected no error getting virtual servers") + assert.Len(t, virtualServers, 2, "Expected 1 virtual server") + + for _, virtualServer := range virtualServers { + if virtualServer.Address.String() == "172.20.1.1" { + clusterIPServer = virtualServer + } + + if virtualServer.Address.String() == "1.2.3.4" { + externalIPServer = virtualServer + } + } + + realServers1, rsErr1 = ipvs.GetRealServers(clusterIPServer) + assert.Nil(t, rsErr1, "Expected no error getting real servers") + assert.Len(t, realServers1, 0, "Expected 0 real servers") + + realServers2, rsErr2 = ipvs.GetRealServers(externalIPServer) + assert.Nil(t, rsErr2, "Expected no error getting real servers") + assert.Len(t, realServers2, 0, "Expected 0 real servers") +} From 14cc201b58b8d28287aef337a753da1a6037089e Mon Sep 17 00:00:00 2001 From: Andrew Sy Kim Date: Thu, 27 May 2021 13:00:30 -0400 Subject: [PATCH 14/17] proxy: add test case in TestGetLocalEndpointIPs for when all endpoints are terminating Signed-off-by: Andrew Sy Kim --- pkg/proxy/endpoints_test.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/pkg/proxy/endpoints_test.go b/pkg/proxy/endpoints_test.go index d736cd15eb8..a5c678cae77 100644 --- a/pkg/proxy/endpoints_test.go +++ b/pkg/proxy/endpoints_test.go @@ -134,6 +134,28 @@ func TestGetLocalEndpointIPs(t *testing.T) { {Namespace: "ns2", Name: "ep2"}: sets.NewString("2.2.2.2", "2.2.2.22"), {Namespace: "ns4", Name: "ep4"}: sets.NewString("4.4.4.4", "4.4.4.6"), }, + }, { + // Case[6]: all endpoints are terminating,, so getLocalReadyEndpointIPs should return 0 ready endpoints + endpointsMap: EndpointsMap{ + makeServicePortName("ns1", "ep1", "p11", v1.ProtocolTCP): []Endpoint{ + &BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: false, Serving: true, Terminating: true}, + }, + makeServicePortName("ns2", "ep2", "p22", v1.ProtocolTCP): []Endpoint{ + &BaseEndpointInfo{Endpoint: "2.2.2.2:22", IsLocal: true, Ready: false, Serving: true, Terminating: true}, + &BaseEndpointInfo{Endpoint: "2.2.2.22:22", IsLocal: true, Ready: false, Serving: true, Terminating: true}, + }, + makeServicePortName("ns2", "ep2", "p23", v1.ProtocolTCP): []Endpoint{ + &BaseEndpointInfo{Endpoint: "2.2.2.3:23", IsLocal: true, Ready: false, Serving: true, Terminating: true}, + }, + makeServicePortName("ns4", "ep4", "p44", v1.ProtocolTCP): []Endpoint{ + &BaseEndpointInfo{Endpoint: "4.4.4.4:44", IsLocal: true, Ready: false, Serving: true, Terminating: true}, + &BaseEndpointInfo{Endpoint: "4.4.4.5:44", IsLocal: false, Ready: false, Serving: true, Terminating: true}, + }, + makeServicePortName("ns4", "ep4", "p45", v1.ProtocolTCP): []Endpoint{ + &BaseEndpointInfo{Endpoint: "4.4.4.6:45", IsLocal: true, Ready: false, Serving: true, Terminating: true}, + }, + }, + expected: make(map[types.NamespacedName]sets.String, 0), }} for tci, tc := range testCases { From ed4fe0737551bf50b54633acf17beee54a1d5f96 Mon Sep 17 00:00:00 2001 From: Andrew Sy Kim Date: Thu, 27 May 2021 13:25:36 -0400 Subject: [PATCH 15/17] proxy/iptables: add unit test Test_HealthCheckNodePortWhenTerminating for ensuring health check node port fails when all local endpoints are terminating Signed-off-by: Andrew Sy Kim --- pkg/proxy/iptables/proxier_test.go | 113 +++++++++++++++++++++++++++++ 1 file changed, 113 insertions(+) diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index c7f5010572a..ba4f8aa544c 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -3160,6 +3160,119 @@ COMMIT assert.NotEqual(t, expectedIPTables, fp.iptablesData.String()) } +// Test_HealthCheckNodePortWhenTerminating tests that health check node ports are not enabled when all local endpoints are terminating +func Test_HealthCheckNodePortWhenTerminating(t *testing.T) { + ipt := iptablestest.NewFake() + fp := NewFakeProxier(ipt, true) + fp.OnServiceSynced() + fp.OnEndpointsSynced() + fp.OnEndpointSlicesSynced() + + serviceName := "svc1" + namespaceName := "ns1" + + fp.OnServiceAdd(&v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName}, + Spec: v1.ServiceSpec{ + ClusterIP: "172.20.1.1", + Selector: map[string]string{"foo": "bar"}, + Ports: []v1.ServicePort{{Name: "", TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP}}, + }, + }) + + tcpProtocol := v1.ProtocolTCP + endpointSlice := &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-1", serviceName), + Namespace: namespaceName, + Labels: map[string]string{discovery.LabelServiceName: serviceName}, + }, + Ports: []discovery.EndpointPort{{ + Name: utilpointer.StringPtr(""), + Port: utilpointer.Int32Ptr(80), + Protocol: &tcpProtocol, + }}, + AddressType: discovery.AddressTypeIPv4, + Endpoints: []discovery.Endpoint{{ + Addresses: []string{"10.0.1.1"}, + Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)}, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, { + Addresses: []string{"10.0.1.2"}, + Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)}, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, { + Addresses: []string{"10.0.1.3"}, + Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)}, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, { // not ready endpoints should be ignored + Addresses: []string{"10.0.1.4"}, + Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(false)}, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }}, + } + + fp.OnEndpointSliceAdd(endpointSlice) + result := fp.endpointsMap.Update(fp.endpointsChanges) + if len(result.HCEndpointsLocalIPSize) != 1 { + t.Errorf("unexpected number of health check node ports, expected 1 but got: %d", len(result.HCEndpointsLocalIPSize)) + } + + // set all endpoints to terminating + endpointSliceTerminating := &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-1", serviceName), + Namespace: namespaceName, + Labels: map[string]string{discovery.LabelServiceName: serviceName}, + }, + Ports: []discovery.EndpointPort{{ + Name: utilpointer.StringPtr(""), + Port: utilpointer.Int32Ptr(80), + Protocol: &tcpProtocol, + }}, + AddressType: discovery.AddressTypeIPv4, + Endpoints: []discovery.Endpoint{{ + Addresses: []string{"10.0.1.1"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(false), + }, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, { + Addresses: []string{"10.0.1.2"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(true), + }, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, { + Addresses: []string{"10.0.1.3"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(true), + }, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, { // not ready endpoints should be ignored + Addresses: []string{"10.0.1.4"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(false), + Terminating: utilpointer.BoolPtr(true), + }, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }}, + } + + fp.OnEndpointSliceUpdate(endpointSlice, endpointSliceTerminating) + result = fp.endpointsMap.Update(fp.endpointsChanges) + if len(result.HCEndpointsLocalIPSize) != 0 { + t.Errorf("unexpected number of health check node ports, expected 0 but got: %d", len(result.HCEndpointsLocalIPSize)) + } +} + func TestProxierDeleteNodePortStaleUDP(t *testing.T) { fcmd := fakeexec.FakeCmd{} fexec := fakeexec.FakeExec{ From 8eb7e81bc94f708def0b40fd2b7e4f945c34df17 Mon Sep 17 00:00:00 2001 From: Andrew Sy Kim Date: Thu, 27 May 2021 13:27:57 -0400 Subject: [PATCH 16/17] proxy/ipvs: add unit test Test_HealthCheckNodePortWhenTerminating for ensuring health check node port fails when all local endpoints are terminating Signed-off-by: Andrew Sy Kim --- pkg/proxy/ipvs/proxier_test.go | 115 +++++++++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 11d9bfaca9f..532b33bf363 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -4424,6 +4424,121 @@ func TestHealthCheckNodePortE2E(t *testing.T) { assert.Equal(t, 0, activeEntries3.Len(), "Expected 0 active entries in KUBE-HEALTH-CHECK-NODE-PORT") } +// Test_HealthCheckNodePortWhenTerminating tests that health check node ports are not enabled when all local endpoints are terminating +func Test_HealthCheckNodePortWhenTerminating(t *testing.T) { + ipt := iptablestest.NewFake() + ipvs := ipvstest.NewFake() + ipset := ipsettest.NewFake(testIPSetVersion) + fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, true, v1.IPv4Protocol) + fp.servicesSynced = true + fp.endpointsSynced = true + fp.endpointSlicesSynced = true + + serviceName := "svc1" + namespaceName := "ns1" + + fp.OnServiceAdd(&v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName}, + Spec: v1.ServiceSpec{ + ClusterIP: "172.20.1.1", + Selector: map[string]string{"foo": "bar"}, + Ports: []v1.ServicePort{{Name: "", TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP}}, + }, + }) + + tcpProtocol := v1.ProtocolTCP + endpointSlice := &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-1", serviceName), + Namespace: namespaceName, + Labels: map[string]string{discovery.LabelServiceName: serviceName}, + }, + Ports: []discovery.EndpointPort{{ + Name: utilpointer.StringPtr(""), + Port: utilpointer.Int32Ptr(80), + Protocol: &tcpProtocol, + }}, + AddressType: discovery.AddressTypeIPv4, + Endpoints: []discovery.Endpoint{{ + Addresses: []string{"10.0.1.1"}, + Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)}, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, { + Addresses: []string{"10.0.1.2"}, + Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)}, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, { + Addresses: []string{"10.0.1.3"}, + Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)}, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, { // not ready endpoints should be ignored + Addresses: []string{"10.0.1.4"}, + Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(false)}, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }}, + } + + fp.OnEndpointSliceAdd(endpointSlice) + result := fp.endpointsMap.Update(fp.endpointsChanges) + if len(result.HCEndpointsLocalIPSize) != 1 { + t.Errorf("unexpected number of health check node ports, expected 1 but got: %d", len(result.HCEndpointsLocalIPSize)) + } + + // set all endpoints to terminating + endpointSliceTerminating := &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-1", serviceName), + Namespace: namespaceName, + Labels: map[string]string{discovery.LabelServiceName: serviceName}, + }, + Ports: []discovery.EndpointPort{{ + Name: utilpointer.StringPtr(""), + Port: utilpointer.Int32Ptr(80), + Protocol: &tcpProtocol, + }}, + AddressType: discovery.AddressTypeIPv4, + Endpoints: []discovery.Endpoint{{ + Addresses: []string{"10.0.1.1"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(false), + }, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, { + Addresses: []string{"10.0.1.2"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(true), + }, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, { + Addresses: []string{"10.0.1.3"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(true), + }, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, { // not ready endpoints should be ignored + Addresses: []string{"10.0.1.4"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(false), + Terminating: utilpointer.BoolPtr(true), + }, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }}, + } + + fp.OnEndpointSliceUpdate(endpointSlice, endpointSliceTerminating) + result = fp.endpointsMap.Update(fp.endpointsChanges) + if len(result.HCEndpointsLocalIPSize) != 0 { + t.Errorf("unexpected number of health check node ports, expected 0 but got: %d", len(result.HCEndpointsLocalIPSize)) + } +} + func TestFilterCIDRs(t *testing.T) { var cidrList []string var cidrs []string From 1010e6a9d98f406a6687eb33573713893dc5431e Mon Sep 17 00:00:00 2001 From: Andrew Sy Kim Date: Fri, 4 Jun 2021 20:13:47 -0400 Subject: [PATCH 17/17] proxier/ipvs: fix test cases where ready endpoints were not used Signed-off-by: Andrew Sy Kim --- pkg/proxy/ipvs/proxier_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 532b33bf363..c0e5bd6a8bd 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -4910,8 +4910,9 @@ func Test_EndpointSliceReadyAndTerminatingLocal(t *testing.T) { // externalIP should route to local ready + non-terminating endpoints if they exist realServers2, rsErr2 := ipvs.GetRealServers(externalIPServer) assert.Nil(t, rsErr2, "Expected no error getting real servers") - assert.Len(t, realServers2, 1, "Expected 1 real servers") - assert.Equal(t, realServers2[0].String(), "10.0.1.3:80") + assert.Len(t, realServers2, 2, "Expected 2 real servers") + assert.Equal(t, realServers2[0].String(), "10.0.1.1:80") + assert.Equal(t, realServers2[1].String(), "10.0.1.2:80") fp.OnEndpointSliceDelete(endpointSlice) fp.syncProxyRules()