Merge pull request #97238 from andrewsykim/kube-proxy-handle-terminating

kube-proxy handle terminating endpoints
This commit is contained in:
Kubernetes Prow Robot 2021-06-28 20:46:40 -07:00 committed by GitHub
commit db3a216fbb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 1452 additions and 125 deletions

View File

@ -541,6 +541,13 @@ const (
// Enable Terminating condition in Endpoint Slices.
EndpointSliceTerminatingCondition featuregate.Feature = "EndpointSliceTerminatingCondition"
// owner: @andrewsykim
// kep: http://kep.k8s.io/1669
// alpha: v1.22
//
// Enable kube-proxy to handle terminating ednpoints when externalTrafficPolicy=Local
ProxyTerminatingEndpoints featuregate.Feature = "ProxyTerminatingEndpoints"
// owner: @robscott
// kep: http://kep.k8s.io/752
// alpha: v1.20
@ -779,6 +786,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
EndpointSlice: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.25
EndpointSliceProxying: {Default: true, PreRelease: featuregate.Beta},
EndpointSliceTerminatingCondition: {Default: false, PreRelease: featuregate.Alpha},
ProxyTerminatingEndpoints: {Default: false, PreRelease: featuregate.Alpha},
EndpointSliceNodeName: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, //remove in 1.25
WindowsEndpointSliceProxying: {Default: true, PreRelease: featuregate.Beta},
StartupProbe: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.23

View File

@ -134,6 +134,28 @@ func TestGetLocalEndpointIPs(t *testing.T) {
{Namespace: "ns2", Name: "ep2"}: sets.NewString("2.2.2.2", "2.2.2.22"),
{Namespace: "ns4", Name: "ep4"}: sets.NewString("4.4.4.4", "4.4.4.6"),
},
}, {
// Case[6]: all endpoints are terminating,, so getLocalReadyEndpointIPs should return 0 ready endpoints
endpointsMap: EndpointsMap{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolTCP): []Endpoint{
&BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: false, Serving: true, Terminating: true},
},
makeServicePortName("ns2", "ep2", "p22", v1.ProtocolTCP): []Endpoint{
&BaseEndpointInfo{Endpoint: "2.2.2.2:22", IsLocal: true, Ready: false, Serving: true, Terminating: true},
&BaseEndpointInfo{Endpoint: "2.2.2.22:22", IsLocal: true, Ready: false, Serving: true, Terminating: true},
},
makeServicePortName("ns2", "ep2", "p23", v1.ProtocolTCP): []Endpoint{
&BaseEndpointInfo{Endpoint: "2.2.2.3:23", IsLocal: true, Ready: false, Serving: true, Terminating: true},
},
makeServicePortName("ns4", "ep4", "p44", v1.ProtocolTCP): []Endpoint{
&BaseEndpointInfo{Endpoint: "4.4.4.4:44", IsLocal: true, Ready: false, Serving: true, Terminating: true},
&BaseEndpointInfo{Endpoint: "4.4.4.5:44", IsLocal: false, Ready: false, Serving: true, Terminating: true},
},
makeServicePortName("ns4", "ep4", "p45", v1.ProtocolTCP): []Endpoint{
&BaseEndpointInfo{Endpoint: "4.4.4.6:45", IsLocal: true, Ready: false, Serving: true, Terminating: true},
},
},
expected: make(map[types.NamespacedName]sets.String, 0),
}}
for tci, tc := range testCases {

View File

@ -993,6 +993,11 @@ func (proxier *Proxier) syncProxyRules() {
// slice = append(slice[:0], ...)
endpoints := make([]*endpointsInfo, 0)
endpointChains := make([]utiliptables.Chain, 0)
readyEndpoints := make([]*endpointsInfo, 0)
readyEndpointChains := make([]utiliptables.Chain, 0)
localReadyEndpointChains := make([]utiliptables.Chain, 0)
localServingTerminatingEndpointChains := make([]utiliptables.Chain, 0)
// To avoid growing this slice, we arbitrarily set its size to 64,
// there is never more than that many arguments for a single line.
// Note that even if we go over 64, it will still be correct - it
@ -1033,16 +1038,7 @@ func (proxier *Proxier) syncProxyRules() {
// Service does not have conflicting configuration such as
// externalTrafficPolicy=Local.
allEndpoints = proxy.FilterEndpoints(allEndpoints, svcInfo, proxier.nodeLabels)
readyEndpoints := make([]proxy.Endpoint, 0, len(allEndpoints))
for _, endpoint := range allEndpoints {
if !endpoint.IsReady() {
continue
}
readyEndpoints = append(readyEndpoints, endpoint)
}
hasEndpoints := len(readyEndpoints) > 0
hasEndpoints := len(allEndpoints) > 0
svcChain := svcInfo.servicePortChainName
if hasEndpoints {
@ -1365,7 +1361,7 @@ func (proxier *Proxier) syncProxyRules() {
endpoints = endpoints[:0]
endpointChains = endpointChains[:0]
var endpointChain utiliptables.Chain
for _, ep := range readyEndpoints {
for _, ep := range allEndpoints {
epInfo, ok := ep.(*endpointsInfo)
if !ok {
klog.ErrorS(err, "Failed to cast endpointsInfo", "endpointsInfo", ep.String())
@ -1401,16 +1397,33 @@ func (proxier *Proxier) syncProxyRules() {
}
}
// Now write loadbalancing & DNAT rules.
n := len(endpointChains)
localEndpointChains := make([]utiliptables.Chain, 0)
// Firstly, categorize each endpoint into three buckets:
// 1. all endpoints that are ready and NOT terminating.
// 2. all endpoints that are local, ready and NOT terminating, and externalTrafficPolicy=Local
// 3. all endpoints that are local, serving and terminating, and externalTrafficPolicy=Local
readyEndpointChains = readyEndpointChains[:0]
readyEndpoints := readyEndpoints[:0]
localReadyEndpointChains := localReadyEndpointChains[:0]
localServingTerminatingEndpointChains := localServingTerminatingEndpointChains[:0]
for i, endpointChain := range endpointChains {
// Write ingress loadbalancing & DNAT rules only for services that request OnlyLocal traffic.
if svcInfo.NodeLocalExternal() && endpoints[i].IsLocal {
localEndpointChains = append(localEndpointChains, endpointChains[i])
if endpoints[i].Ready {
readyEndpointChains = append(readyEndpointChains, endpointChain)
readyEndpoints = append(readyEndpoints, endpoints[i])
}
epIP := endpoints[i].IP()
if svc.NodeLocalExternal() && endpoints[i].IsLocal {
if endpoints[i].Ready {
localReadyEndpointChains = append(localReadyEndpointChains, endpointChain)
} else if endpoints[i].Serving && endpoints[i].Terminating {
localServingTerminatingEndpointChains = append(localServingTerminatingEndpointChains, endpointChain)
}
}
}
// Now write loadbalancing & DNAT rules.
numReadyEndpoints := len(readyEndpointChains)
for i, endpointChain := range readyEndpointChains {
epIP := readyEndpoints[i].IP()
if epIP == "" {
// Error parsing this endpoint has been logged. Skip to next endpoint.
continue
@ -1419,16 +1432,26 @@ func (proxier *Proxier) syncProxyRules() {
// Balancing rules in the per-service chain.
args = append(args[:0], "-A", string(svcChain))
args = proxier.appendServiceCommentLocked(args, svcNameString)
if i < (n - 1) {
if i < (numReadyEndpoints - 1) {
// Each rule is a probabilistic match.
args = append(args,
"-m", "statistic",
"--mode", "random",
"--probability", proxier.probability(n-i))
"--probability", proxier.probability(numReadyEndpoints-i))
}
// The final (or only if n == 1) rule is a guaranteed match.
args = append(args, "-j", string(endpointChain))
utilproxy.WriteLine(proxier.natRules, args...)
}
// Every endpoint gets a chain, regardless of its state. This is required later since we may
// want to jump to endpoint chains that are terminating.
for i, endpointChain := range endpointChains {
epIP := endpoints[i].IP()
if epIP == "" {
// Error parsing this endpoint has been logged. Skip to next endpoint.
continue
}
// Rules in the per-endpoint chain.
args = append(args[:0], "-A", string(endpointChain))
@ -1474,6 +1497,12 @@ func (proxier *Proxier) syncProxyRules() {
"-m", "comment", "--comment", fmt.Sprintf(`"route LOCAL traffic for %s LB IP to service chain"`, svcNameString),
"-m", "addrtype", "--src-type", "LOCAL", "-j", string(svcChain))...)
// Prefer local ready endpoint chains, but fall back to ready terminating if none exist
localEndpointChains := localReadyEndpointChains
if utilfeature.DefaultFeatureGate.Enabled(features.ProxyTerminatingEndpoints) && len(localEndpointChains) == 0 {
localEndpointChains = localServingTerminatingEndpointChains
}
numLocalEndpoints := len(localEndpointChains)
if numLocalEndpoints == 0 {
// Blackhole all traffic since there are no local endpoints

View File

@ -2967,6 +2967,7 @@ COMMIT
:KUBE-SEP-3JOIVZTXZZRGORX4 - [0:0]
:KUBE-SEP-IO5XOSKPAXIFQXAJ - [0:0]
:KUBE-SEP-XGJFVO3L2O5SRFNT - [0:0]
:KUBE-SEP-VLJB2F747S6W7EX4 - [0:0]
-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN
-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
@ -2974,14 +2975,16 @@ COMMIT
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 0 ! -s 10.0.0.0/24 -j KUBE-MARK-MASQ
-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 0 -j KUBE-SVC-AQI2S6QIMU7PVVRP
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.3333333333 -j KUBE-SEP-3JOIVZTXZZRGORX4
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-IO5XOSKPAXIFQXAJ
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-XGJFVO3L2O5SRFNT
-A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -s 10.0.1.1/32 -j KUBE-MARK-MASQ
-A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-IO5XOSKPAXIFQXAJ
-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -s 10.0.1.2/32 -j KUBE-MARK-MASQ
-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.2:80
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-XGJFVO3L2O5SRFNT
-A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -s 10.0.1.3/32 -j KUBE-MARK-MASQ
-A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.3:80
-A KUBE-SEP-VLJB2F747S6W7EX4 -m comment --comment ns1/svc1 -s 10.0.1.4/32 -j KUBE-MARK-MASQ
-A KUBE-SEP-VLJB2F747S6W7EX4 -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.4:80
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
COMMIT
`
@ -3029,7 +3032,7 @@ COMMIT
Addresses: []string{"10.0.1.3"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Topology: map[string]string{"kubernetes.io/hostname": "node3"},
}, { // not ready endpoints should be ignored
}, {
Addresses: []string{"10.0.1.4"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(false)},
Topology: map[string]string{"kubernetes.io/hostname": "node4"},
@ -3067,6 +3070,7 @@ COMMIT
:KUBE-SEP-3JOIVZTXZZRGORX4 - [0:0]
:KUBE-SEP-IO5XOSKPAXIFQXAJ - [0:0]
:KUBE-SEP-XGJFVO3L2O5SRFNT - [0:0]
:KUBE-SEP-VLJB2F747S6W7EX4 - [0:0]
-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN
-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
@ -3076,14 +3080,16 @@ COMMIT
-A KUBE-NODEPORTS -m comment --comment ns1/svc1 -m tcp -p tcp --dport 30010 -s 127.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-NODEPORTS -m comment --comment ns1/svc1 -m tcp -p tcp --dport 30010 -j KUBE-XLB-AQI2S6QIMU7PVVRP
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.3333333333 -j KUBE-SEP-3JOIVZTXZZRGORX4
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-IO5XOSKPAXIFQXAJ
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-XGJFVO3L2O5SRFNT
-A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -s 10.0.1.1/32 -j KUBE-MARK-MASQ
-A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-IO5XOSKPAXIFQXAJ
-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -s 10.0.1.2/32 -j KUBE-MARK-MASQ
-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.2:80
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-XGJFVO3L2O5SRFNT
-A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -s 10.0.1.3/32 -j KUBE-MARK-MASQ
-A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.3:80
-A KUBE-SEP-VLJB2F747S6W7EX4 -m comment --comment ns1/svc1 -s 10.0.1.4/32 -j KUBE-MARK-MASQ
-A KUBE-SEP-VLJB2F747S6W7EX4 -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.4:80
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Redirect pods trying to reach external loadbalancer VIP to clusterIP" -s 10.0.0.0/24 -j KUBE-SVC-AQI2S6QIMU7PVVRP
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "masquerade LOCAL traffic for ns1/svc1 LB IP" -m addrtype --src-type LOCAL -j KUBE-MARK-MASQ
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "route LOCAL traffic for ns1/svc1 LB IP to service chain" -m addrtype --src-type LOCAL -j KUBE-SVC-AQI2S6QIMU7PVVRP
@ -3139,7 +3145,7 @@ COMMIT
Addresses: []string{"10.0.1.3"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Topology: map[string]string{"kubernetes.io/hostname": "node3"},
}, { // not ready endpoints should be ignored
}, {
Addresses: []string{"10.0.1.4"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(false)},
Topology: map[string]string{"kubernetes.io/hostname": "node4"},
@ -3154,6 +3160,119 @@ COMMIT
assert.NotEqual(t, expectedIPTables, fp.iptablesData.String())
}
// Test_HealthCheckNodePortWhenTerminating tests that health check node ports are not enabled when all local endpoints are terminating
func Test_HealthCheckNodePortWhenTerminating(t *testing.T) {
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt, true)
fp.OnServiceSynced()
fp.OnEndpointsSynced()
fp.OnEndpointSlicesSynced()
serviceName := "svc1"
namespaceName := "ns1"
fp.OnServiceAdd(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName},
Spec: v1.ServiceSpec{
ClusterIP: "172.20.1.1",
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Name: "", TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP}},
},
})
tcpProtocol := v1.ProtocolTCP
endpointSlice := &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-1", serviceName),
Namespace: namespaceName,
Labels: map[string]string{discovery.LabelServiceName: serviceName},
},
Ports: []discovery.EndpointPort{{
Name: utilpointer.StringPtr(""),
Port: utilpointer.Int32Ptr(80),
Protocol: &tcpProtocol,
}},
AddressType: discovery.AddressTypeIPv4,
Endpoints: []discovery.Endpoint{{
Addresses: []string{"10.0.1.1"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
}, {
Addresses: []string{"10.0.1.2"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
}, {
Addresses: []string{"10.0.1.3"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
}, { // not ready endpoints should be ignored
Addresses: []string{"10.0.1.4"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(false)},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
}},
}
fp.OnEndpointSliceAdd(endpointSlice)
result := fp.endpointsMap.Update(fp.endpointsChanges)
if len(result.HCEndpointsLocalIPSize) != 1 {
t.Errorf("unexpected number of health check node ports, expected 1 but got: %d", len(result.HCEndpointsLocalIPSize))
}
// set all endpoints to terminating
endpointSliceTerminating := &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-1", serviceName),
Namespace: namespaceName,
Labels: map[string]string{discovery.LabelServiceName: serviceName},
},
Ports: []discovery.EndpointPort{{
Name: utilpointer.StringPtr(""),
Port: utilpointer.Int32Ptr(80),
Protocol: &tcpProtocol,
}},
AddressType: discovery.AddressTypeIPv4,
Endpoints: []discovery.Endpoint{{
Addresses: []string{"10.0.1.1"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(false),
Serving: utilpointer.BoolPtr(true),
Terminating: utilpointer.BoolPtr(false),
},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
}, {
Addresses: []string{"10.0.1.2"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(false),
Serving: utilpointer.BoolPtr(true),
Terminating: utilpointer.BoolPtr(true),
},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
}, {
Addresses: []string{"10.0.1.3"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(false),
Serving: utilpointer.BoolPtr(true),
Terminating: utilpointer.BoolPtr(true),
},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
}, { // not ready endpoints should be ignored
Addresses: []string{"10.0.1.4"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(false),
Serving: utilpointer.BoolPtr(false),
Terminating: utilpointer.BoolPtr(true),
},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
}},
}
fp.OnEndpointSliceUpdate(endpointSlice, endpointSliceTerminating)
result = fp.endpointsMap.Update(fp.endpointsChanges)
if len(result.HCEndpointsLocalIPSize) != 0 {
t.Errorf("unexpected number of health check node ports, expected 0 but got: %d", len(result.HCEndpointsLocalIPSize))
}
}
func TestProxierDeleteNodePortStaleUDP(t *testing.T) {
fcmd := fakeexec.FakeCmd{}
fexec := fakeexec.FakeExec{
@ -3397,12 +3516,12 @@ COMMIT
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 80 ! -s 10.0.0.0/24 -j KUBE-MARK-MASQ
-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 80 -j KUBE-SVC-AQI2S6QIMU7PVVRP
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.3333333333 -j KUBE-SEP-3JOIVZTXZZRGORX4
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-IO5XOSKPAXIFQXAJ
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-XGJFVO3L2O5SRFNT
-A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -s 10.0.1.1/32 -j KUBE-MARK-MASQ
-A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-IO5XOSKPAXIFQXAJ
-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -s 10.0.1.2/32 -j KUBE-MARK-MASQ
-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.2:80
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-XGJFVO3L2O5SRFNT
-A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -s 10.0.1.3/32 -j KUBE-MARK-MASQ
-A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.3:80
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
@ -3575,3 +3694,554 @@ COMMIT
}
}
}
// Test_EndpointSliceWithTerminatingEndpoints tests that when there are local ready and ready + terminating
// endpoints, only the ready endpoints are used.
func Test_EndpointSliceWithTerminatingEndpoints(t *testing.T) {
tcpProtocol := v1.ProtocolTCP
testcases := []struct {
name string
terminatingFeatureGate bool
service *v1.Service
endpointslice *discovery.EndpointSlice
expectedIPTables string
}{
{
name: "feature gate ProxyTerminatingEndpoints enabled, ready endpoints exist",
terminatingFeatureGate: true,
service: &v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "svc1", Namespace: "ns1"},
Spec: v1.ServiceSpec{
ClusterIP: "172.20.1.1",
Type: v1.ServiceTypeNodePort,
ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeLocal,
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{
{
Name: "",
TargetPort: intstr.FromInt(80),
Port: 80,
Protocol: v1.ProtocolTCP,
},
},
},
},
endpointslice: &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-1", "svc1"),
Namespace: "ns1",
Labels: map[string]string{discovery.LabelServiceName: "svc1"},
},
Ports: []discovery.EndpointPort{{
Name: utilpointer.StringPtr(""),
Port: utilpointer.Int32Ptr(80),
Protocol: &tcpProtocol,
}},
AddressType: discovery.AddressTypeIPv4,
Endpoints: []discovery.Endpoint{
{
Addresses: []string{"10.0.1.1"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(true),
Serving: utilpointer.BoolPtr(true),
Terminating: utilpointer.BoolPtr(false),
},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
},
{
Addresses: []string{"10.0.1.2"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(true),
Serving: utilpointer.BoolPtr(true),
Terminating: utilpointer.BoolPtr(false),
},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
},
{
// this endpoint should be ignored for node ports since there are ready non-terminating endpoints
Addresses: []string{"10.0.1.3"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(false),
Serving: utilpointer.BoolPtr(true),
Terminating: utilpointer.BoolPtr(true),
},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
},
{
// this endpoint should be ignored for node ports since there are ready non-terminating endpoints
Addresses: []string{"10.0.1.4"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(false),
Serving: utilpointer.BoolPtr(false),
Terminating: utilpointer.BoolPtr(true),
},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
},
{
// this endpoint should be ignored for node ports since it's not local
Addresses: []string{"10.0.1.5"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(true),
Serving: utilpointer.BoolPtr(true),
Terminating: utilpointer.BoolPtr(false),
},
Topology: map[string]string{"kubernetes.io/hostname": "host-1"},
},
},
},
expectedIPTables: `*filter
:KUBE-SERVICES - [0:0]
:KUBE-EXTERNAL-SERVICES - [0:0]
:KUBE-FORWARD - [0:0]
:KUBE-NODEPORTS - [0:0]
-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod source rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod destination rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
COMMIT
*nat
:KUBE-SERVICES - [0:0]
:KUBE-NODEPORTS - [0:0]
:KUBE-POSTROUTING - [0:0]
:KUBE-MARK-MASQ - [0:0]
:KUBE-SVC-AQI2S6QIMU7PVVRP - [0:0]
:KUBE-XLB-AQI2S6QIMU7PVVRP - [0:0]
:KUBE-SEP-3JOIVZTXZZRGORX4 - [0:0]
:KUBE-SEP-IO5XOSKPAXIFQXAJ - [0:0]
:KUBE-SEP-XGJFVO3L2O5SRFNT - [0:0]
:KUBE-SEP-VLJB2F747S6W7EX4 - [0:0]
:KUBE-SEP-EQCHZ7S2PJ72OHAY - [0:0]
-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN
-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
-A KUBE-MARK-MASQ -j MARK --or-mark 0x4000
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 80 ! -s 10.0.0.0/24 -j KUBE-MARK-MASQ
-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 80 -j KUBE-SVC-AQI2S6QIMU7PVVRP
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.3333333333 -j KUBE-SEP-3JOIVZTXZZRGORX4
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-IO5XOSKPAXIFQXAJ
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-EQCHZ7S2PJ72OHAY
-A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -s 10.0.1.1/32 -j KUBE-MARK-MASQ
-A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80
-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -s 10.0.1.2/32 -j KUBE-MARK-MASQ
-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.2:80
-A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -s 10.0.1.3/32 -j KUBE-MARK-MASQ
-A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.3:80
-A KUBE-SEP-VLJB2F747S6W7EX4 -m comment --comment ns1/svc1 -s 10.0.1.4/32 -j KUBE-MARK-MASQ
-A KUBE-SEP-VLJB2F747S6W7EX4 -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.4:80
-A KUBE-SEP-EQCHZ7S2PJ72OHAY -m comment --comment ns1/svc1 -s 10.0.1.5/32 -j KUBE-MARK-MASQ
-A KUBE-SEP-EQCHZ7S2PJ72OHAY -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.5:80
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Redirect pods trying to reach external loadbalancer VIP to clusterIP" -s 10.0.0.0/24 -j KUBE-SVC-AQI2S6QIMU7PVVRP
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "masquerade LOCAL traffic for ns1/svc1 LB IP" -m addrtype --src-type LOCAL -j KUBE-MARK-MASQ
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "route LOCAL traffic for ns1/svc1 LB IP to service chain" -m addrtype --src-type LOCAL -j KUBE-SVC-AQI2S6QIMU7PVVRP
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Balancing rule 0 for ns1/svc1" -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-3JOIVZTXZZRGORX4
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Balancing rule 1 for ns1/svc1" -j KUBE-SEP-IO5XOSKPAXIFQXAJ
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
COMMIT
`,
},
{
name: "feature gate ProxyTerminatingEndpoints disabled, ready endpoints exist",
terminatingFeatureGate: false,
service: &v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "svc1", Namespace: "ns1"},
Spec: v1.ServiceSpec{
ClusterIP: "172.20.1.1",
Type: v1.ServiceTypeNodePort,
ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeLocal,
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{
{
Name: "",
TargetPort: intstr.FromInt(80),
Port: 80,
Protocol: v1.ProtocolTCP,
},
},
},
},
endpointslice: &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-1", "svc1"),
Namespace: "ns1",
Labels: map[string]string{discovery.LabelServiceName: "svc1"},
},
Ports: []discovery.EndpointPort{{
Name: utilpointer.StringPtr(""),
Port: utilpointer.Int32Ptr(80),
Protocol: &tcpProtocol,
}},
AddressType: discovery.AddressTypeIPv4,
Endpoints: []discovery.Endpoint{
{
Addresses: []string{"10.0.1.1"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(true),
Serving: utilpointer.BoolPtr(true),
Terminating: utilpointer.BoolPtr(false),
},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
},
{
Addresses: []string{"10.0.1.2"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(true),
Serving: utilpointer.BoolPtr(true),
Terminating: utilpointer.BoolPtr(false),
},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
},
{
// this endpoint should be ignored for node ports since there are ready non-terminating endpoints
Addresses: []string{"10.0.1.3"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(false),
Serving: utilpointer.BoolPtr(true),
Terminating: utilpointer.BoolPtr(true),
},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
},
{
// this endpoint should be ignored for node ports since there are ready non-terminating endpoints
Addresses: []string{"10.0.1.4"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(false),
Serving: utilpointer.BoolPtr(false),
Terminating: utilpointer.BoolPtr(true),
},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
},
{
// this endpoint should be ignored for node ports since it's not local
Addresses: []string{"10.0.1.5"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(true),
Serving: utilpointer.BoolPtr(true),
Terminating: utilpointer.BoolPtr(false),
},
Topology: map[string]string{"kubernetes.io/hostname": "host-1"},
},
},
},
expectedIPTables: `*filter
:KUBE-SERVICES - [0:0]
:KUBE-EXTERNAL-SERVICES - [0:0]
:KUBE-FORWARD - [0:0]
:KUBE-NODEPORTS - [0:0]
-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod source rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod destination rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
COMMIT
*nat
:KUBE-SERVICES - [0:0]
:KUBE-NODEPORTS - [0:0]
:KUBE-POSTROUTING - [0:0]
:KUBE-MARK-MASQ - [0:0]
:KUBE-SVC-AQI2S6QIMU7PVVRP - [0:0]
:KUBE-XLB-AQI2S6QIMU7PVVRP - [0:0]
:KUBE-SEP-3JOIVZTXZZRGORX4 - [0:0]
:KUBE-SEP-IO5XOSKPAXIFQXAJ - [0:0]
:KUBE-SEP-XGJFVO3L2O5SRFNT - [0:0]
:KUBE-SEP-VLJB2F747S6W7EX4 - [0:0]
:KUBE-SEP-EQCHZ7S2PJ72OHAY - [0:0]
-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN
-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
-A KUBE-MARK-MASQ -j MARK --or-mark 0x4000
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 80 ! -s 10.0.0.0/24 -j KUBE-MARK-MASQ
-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 80 -j KUBE-SVC-AQI2S6QIMU7PVVRP
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.3333333333 -j KUBE-SEP-3JOIVZTXZZRGORX4
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-IO5XOSKPAXIFQXAJ
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-EQCHZ7S2PJ72OHAY
-A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -s 10.0.1.1/32 -j KUBE-MARK-MASQ
-A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80
-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -s 10.0.1.2/32 -j KUBE-MARK-MASQ
-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.2:80
-A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -s 10.0.1.3/32 -j KUBE-MARK-MASQ
-A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.3:80
-A KUBE-SEP-VLJB2F747S6W7EX4 -m comment --comment ns1/svc1 -s 10.0.1.4/32 -j KUBE-MARK-MASQ
-A KUBE-SEP-VLJB2F747S6W7EX4 -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.4:80
-A KUBE-SEP-EQCHZ7S2PJ72OHAY -m comment --comment ns1/svc1 -s 10.0.1.5/32 -j KUBE-MARK-MASQ
-A KUBE-SEP-EQCHZ7S2PJ72OHAY -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.5:80
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Redirect pods trying to reach external loadbalancer VIP to clusterIP" -s 10.0.0.0/24 -j KUBE-SVC-AQI2S6QIMU7PVVRP
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "masquerade LOCAL traffic for ns1/svc1 LB IP" -m addrtype --src-type LOCAL -j KUBE-MARK-MASQ
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "route LOCAL traffic for ns1/svc1 LB IP to service chain" -m addrtype --src-type LOCAL -j KUBE-SVC-AQI2S6QIMU7PVVRP
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Balancing rule 0 for ns1/svc1" -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-3JOIVZTXZZRGORX4
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Balancing rule 1 for ns1/svc1" -j KUBE-SEP-IO5XOSKPAXIFQXAJ
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
COMMIT
`,
},
{
name: "feature gate ProxyTerminatingEndpoints enabled, only terminating endpoints exist",
terminatingFeatureGate: true,
service: &v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "svc1", Namespace: "ns1"},
Spec: v1.ServiceSpec{
ClusterIP: "172.20.1.1",
Type: v1.ServiceTypeNodePort,
ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeLocal,
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{
{
Name: "",
TargetPort: intstr.FromInt(80),
Port: 80,
Protocol: v1.ProtocolTCP,
},
},
},
},
endpointslice: &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-1", "svc1"),
Namespace: "ns1",
Labels: map[string]string{discovery.LabelServiceName: "svc1"},
},
Ports: []discovery.EndpointPort{{
Name: utilpointer.StringPtr(""),
Port: utilpointer.Int32Ptr(80),
Protocol: &tcpProtocol,
}},
AddressType: discovery.AddressTypeIPv4,
Endpoints: []discovery.Endpoint{
{
// this endpoint should be used since there are only ready terminating endpoints
Addresses: []string{"10.0.1.2"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(false),
Serving: utilpointer.BoolPtr(true),
Terminating: utilpointer.BoolPtr(true),
},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
},
{
// this endpoint should be used since there are only ready terminating endpoints
Addresses: []string{"10.0.1.3"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(false),
Serving: utilpointer.BoolPtr(true),
Terminating: utilpointer.BoolPtr(true),
},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
},
{
// this endpoint should not be used since it is both terminating and not ready.
Addresses: []string{"10.0.1.4"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(false),
Serving: utilpointer.BoolPtr(false),
Terminating: utilpointer.BoolPtr(true),
},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
},
{
// this endpoint should be ignored for node ports since it's not local
Addresses: []string{"10.0.1.5"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(true),
Serving: utilpointer.BoolPtr(true),
Terminating: utilpointer.BoolPtr(false),
},
Topology: map[string]string{"kubernetes.io/hostname": "host-1"},
},
},
},
expectedIPTables: `*filter
:KUBE-SERVICES - [0:0]
:KUBE-EXTERNAL-SERVICES - [0:0]
:KUBE-FORWARD - [0:0]
:KUBE-NODEPORTS - [0:0]
-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod source rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod destination rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
COMMIT
*nat
:KUBE-SERVICES - [0:0]
:KUBE-NODEPORTS - [0:0]
:KUBE-POSTROUTING - [0:0]
:KUBE-MARK-MASQ - [0:0]
:KUBE-SVC-AQI2S6QIMU7PVVRP - [0:0]
:KUBE-XLB-AQI2S6QIMU7PVVRP - [0:0]
:KUBE-SEP-IO5XOSKPAXIFQXAJ - [0:0]
:KUBE-SEP-XGJFVO3L2O5SRFNT - [0:0]
:KUBE-SEP-VLJB2F747S6W7EX4 - [0:0]
:KUBE-SEP-EQCHZ7S2PJ72OHAY - [0:0]
-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN
-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
-A KUBE-MARK-MASQ -j MARK --or-mark 0x4000
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 80 ! -s 10.0.0.0/24 -j KUBE-MARK-MASQ
-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 80 -j KUBE-SVC-AQI2S6QIMU7PVVRP
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-EQCHZ7S2PJ72OHAY
-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -s 10.0.1.2/32 -j KUBE-MARK-MASQ
-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.2:80
-A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -s 10.0.1.3/32 -j KUBE-MARK-MASQ
-A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.3:80
-A KUBE-SEP-VLJB2F747S6W7EX4 -m comment --comment ns1/svc1 -s 10.0.1.4/32 -j KUBE-MARK-MASQ
-A KUBE-SEP-VLJB2F747S6W7EX4 -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.4:80
-A KUBE-SEP-EQCHZ7S2PJ72OHAY -m comment --comment ns1/svc1 -s 10.0.1.5/32 -j KUBE-MARK-MASQ
-A KUBE-SEP-EQCHZ7S2PJ72OHAY -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.5:80
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Redirect pods trying to reach external loadbalancer VIP to clusterIP" -s 10.0.0.0/24 -j KUBE-SVC-AQI2S6QIMU7PVVRP
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "masquerade LOCAL traffic for ns1/svc1 LB IP" -m addrtype --src-type LOCAL -j KUBE-MARK-MASQ
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "route LOCAL traffic for ns1/svc1 LB IP to service chain" -m addrtype --src-type LOCAL -j KUBE-SVC-AQI2S6QIMU7PVVRP
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Balancing rule 0 for ns1/svc1" -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-IO5XOSKPAXIFQXAJ
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Balancing rule 1 for ns1/svc1" -j KUBE-SEP-XGJFVO3L2O5SRFNT
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
COMMIT
`,
},
{
name: "with ProxyTerminatingEndpoints disabled, only terminating endpoints exist",
terminatingFeatureGate: false,
service: &v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "svc1", Namespace: "ns1"},
Spec: v1.ServiceSpec{
ClusterIP: "172.20.1.1",
Type: v1.ServiceTypeNodePort,
ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeLocal,
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{
{
Name: "",
TargetPort: intstr.FromInt(80),
Port: 80,
Protocol: v1.ProtocolTCP,
},
},
},
},
endpointslice: &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-1", "svc1"),
Namespace: "ns1",
Labels: map[string]string{discovery.LabelServiceName: "svc1"},
},
Ports: []discovery.EndpointPort{{
Name: utilpointer.StringPtr(""),
Port: utilpointer.Int32Ptr(80),
Protocol: &tcpProtocol,
}},
AddressType: discovery.AddressTypeIPv4,
Endpoints: []discovery.Endpoint{
{
Addresses: []string{"10.0.1.1"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(false),
Serving: utilpointer.BoolPtr(true),
Terminating: utilpointer.BoolPtr(true),
},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
},
{
Addresses: []string{"10.0.1.2"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(false),
Serving: utilpointer.BoolPtr(true),
Terminating: utilpointer.BoolPtr(true),
},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
},
{
Addresses: []string{"10.0.1.3"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(false),
Serving: utilpointer.BoolPtr(false),
Terminating: utilpointer.BoolPtr(true),
},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
},
{
Addresses: []string{"10.0.1.4"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(false),
Serving: utilpointer.BoolPtr(false),
Terminating: utilpointer.BoolPtr(true),
},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
},
{
Addresses: []string{"10.0.1.5"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(true),
Serving: utilpointer.BoolPtr(true),
Terminating: utilpointer.BoolPtr(false),
},
Topology: map[string]string{"kubernetes.io/hostname": "host-1"},
},
},
},
expectedIPTables: `*filter
:KUBE-SERVICES - [0:0]
:KUBE-EXTERNAL-SERVICES - [0:0]
:KUBE-FORWARD - [0:0]
:KUBE-NODEPORTS - [0:0]
-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod source rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod destination rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
COMMIT
*nat
:KUBE-SERVICES - [0:0]
:KUBE-NODEPORTS - [0:0]
:KUBE-POSTROUTING - [0:0]
:KUBE-MARK-MASQ - [0:0]
:KUBE-SVC-AQI2S6QIMU7PVVRP - [0:0]
:KUBE-XLB-AQI2S6QIMU7PVVRP - [0:0]
:KUBE-SEP-3JOIVZTXZZRGORX4 - [0:0]
:KUBE-SEP-IO5XOSKPAXIFQXAJ - [0:0]
:KUBE-SEP-XGJFVO3L2O5SRFNT - [0:0]
:KUBE-SEP-VLJB2F747S6W7EX4 - [0:0]
:KUBE-SEP-EQCHZ7S2PJ72OHAY - [0:0]
-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN
-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
-A KUBE-MARK-MASQ -j MARK --or-mark 0x4000
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 80 ! -s 10.0.0.0/24 -j KUBE-MARK-MASQ
-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 80 -j KUBE-SVC-AQI2S6QIMU7PVVRP
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-EQCHZ7S2PJ72OHAY
-A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -s 10.0.1.1/32 -j KUBE-MARK-MASQ
-A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80
-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -s 10.0.1.2/32 -j KUBE-MARK-MASQ
-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.2:80
-A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -s 10.0.1.3/32 -j KUBE-MARK-MASQ
-A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.3:80
-A KUBE-SEP-VLJB2F747S6W7EX4 -m comment --comment ns1/svc1 -s 10.0.1.4/32 -j KUBE-MARK-MASQ
-A KUBE-SEP-VLJB2F747S6W7EX4 -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.4:80
-A KUBE-SEP-EQCHZ7S2PJ72OHAY -m comment --comment ns1/svc1 -s 10.0.1.5/32 -j KUBE-MARK-MASQ
-A KUBE-SEP-EQCHZ7S2PJ72OHAY -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.5:80
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Redirect pods trying to reach external loadbalancer VIP to clusterIP" -s 10.0.0.0/24 -j KUBE-SVC-AQI2S6QIMU7PVVRP
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "masquerade LOCAL traffic for ns1/svc1 LB IP" -m addrtype --src-type LOCAL -j KUBE-MARK-MASQ
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "route LOCAL traffic for ns1/svc1 LB IP to service chain" -m addrtype --src-type LOCAL -j KUBE-SVC-AQI2S6QIMU7PVVRP
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 has no local endpoints" -j KUBE-MARK-DROP
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
COMMIT
`,
},
}
for _, testcase := range testcases {
t.Run(testcase.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ProxyTerminatingEndpoints, testcase.terminatingFeatureGate)()
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt, true)
fp.OnServiceSynced()
fp.OnEndpointsSynced()
fp.OnEndpointSlicesSynced()
fp.OnServiceAdd(testcase.service)
fp.OnEndpointSliceAdd(testcase.endpointslice)
fp.syncProxyRules()
t.Log(fp.iptablesData.String())
assert.Equal(t, testcase.expectedIPTables, fp.iptablesData.String())
fp.OnEndpointSliceDelete(testcase.endpointslice)
fp.syncProxyRules()
assert.NotEqual(t, testcase.expectedIPTables, fp.iptablesData.String())
})
}
}

View File

@ -1238,7 +1238,11 @@ func (proxier *Proxier) syncProxyRules() {
activeBindAddrs[serv.Address.String()] = true
// ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP
// So we still need clusterIP rules in onlyNodeLocalEndpoints mode.
if err := proxier.syncEndpoint(svcName, false, svcInfo.NodeLocalInternal(), serv); err != nil {
internalNodeLocal := false
if utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) && svcInfo.NodeLocalInternal() {
internalNodeLocal = true
}
if err := proxier.syncEndpoint(svcName, internalNodeLocal, serv); err != nil {
klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv.String())
}
} else {
@ -1320,9 +1324,7 @@ func (proxier *Proxier) syncProxyRules() {
activeIPVSServices[serv.String()] = true
activeBindAddrs[serv.Address.String()] = true
onlyNodeLocalEndpoints := svcInfo.NodeLocalExternal()
onlyNodeLocalEndpointsForInternal := svcInfo.NodeLocalInternal()
if err := proxier.syncEndpoint(svcName, onlyNodeLocalEndpoints, onlyNodeLocalEndpointsForInternal, serv); err != nil {
if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), serv); err != nil {
klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv.String())
}
} else {
@ -1423,7 +1425,7 @@ func (proxier *Proxier) syncProxyRules() {
if err := proxier.syncService(svcNameString, serv, true, bindedAddresses); err == nil {
activeIPVSServices[serv.String()] = true
activeBindAddrs[serv.Address.String()] = true
if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), svcInfo.NodeLocalInternal(), serv); err != nil {
if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), serv); err != nil {
klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv)
}
} else {
@ -1591,7 +1593,7 @@ func (proxier *Proxier) syncProxyRules() {
// There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`.
if err := proxier.syncService(svcNameString, serv, false, bindedAddresses); err == nil {
activeIPVSServices[serv.String()] = true
if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), svcInfo.NodeLocalInternal(), serv); err != nil {
if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), serv); err != nil {
klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv)
}
} else {
@ -2042,7 +2044,7 @@ func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer,
return nil
}
func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNodeLocalEndpoints bool, onlyNodeLocalEndpointsForInternal bool, vs *utilipvs.VirtualServer) error {
func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNodeLocalEndpoints bool, vs *utilipvs.VirtualServer) error {
appliedVirtualServer, err := proxier.ipvs.GetVirtualServer(vs)
if err != nil {
klog.Errorf("Failed to get IPVS service, error: %v", err)
@ -2054,8 +2056,13 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
// curEndpoints represents IPVS destinations listed from current system.
curEndpoints := sets.NewString()
// newEndpoints represents Endpoints watched from API Server.
newEndpoints := sets.NewString()
// readyEndpoints represents Endpoints watched from API Server.
readyEndpoints := sets.NewString()
// localReadyEndpoints represents local endpoints that are ready and NOT terminating.
localReadyEndpoints := sets.NewString()
// localReadyTerminatingEndpoints represents local endpoints that are ready AND terminating.
// Fall back to these endpoints if no non-terminating ready endpoints exist for node-local traffic.
localReadyTerminatingEndpoints := sets.NewString()
curDests, err := proxier.ipvs.GetRealServers(appliedVirtualServer)
if err != nil {
@ -2080,15 +2087,28 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
}
for _, epInfo := range endpoints {
if !epInfo.IsReady() {
continue
if epInfo.IsReady() {
readyEndpoints.Insert(epInfo.String())
}
if onlyNodeLocalEndpoints && !epInfo.GetIsLocal() {
continue
if onlyNodeLocalEndpoints && epInfo.GetIsLocal() {
if epInfo.IsReady() {
localReadyEndpoints.Insert(epInfo.String())
} else if epInfo.IsServing() && epInfo.IsTerminating() {
localReadyTerminatingEndpoints.Insert(epInfo.String())
}
}
}
newEndpoints.Insert(epInfo.String())
newEndpoints := readyEndpoints
if onlyNodeLocalEndpoints {
newEndpoints = localReadyEndpoints
if utilfeature.DefaultFeatureGate.Enabled(features.ProxyTerminatingEndpoints) {
if len(newEndpoints) == 0 && localReadyTerminatingEndpoints.Len() > 0 {
newEndpoints = localReadyTerminatingEndpoints
}
}
}
// Create new endpoints

View File

@ -4424,6 +4424,121 @@ func TestHealthCheckNodePortE2E(t *testing.T) {
assert.Equal(t, 0, activeEntries3.Len(), "Expected 0 active entries in KUBE-HEALTH-CHECK-NODE-PORT")
}
// Test_HealthCheckNodePortWhenTerminating tests that health check node ports are not enabled when all local endpoints are terminating
func Test_HealthCheckNodePortWhenTerminating(t *testing.T) {
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, true, v1.IPv4Protocol)
fp.servicesSynced = true
fp.endpointsSynced = true
fp.endpointSlicesSynced = true
serviceName := "svc1"
namespaceName := "ns1"
fp.OnServiceAdd(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName},
Spec: v1.ServiceSpec{
ClusterIP: "172.20.1.1",
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Name: "", TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP}},
},
})
tcpProtocol := v1.ProtocolTCP
endpointSlice := &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-1", serviceName),
Namespace: namespaceName,
Labels: map[string]string{discovery.LabelServiceName: serviceName},
},
Ports: []discovery.EndpointPort{{
Name: utilpointer.StringPtr(""),
Port: utilpointer.Int32Ptr(80),
Protocol: &tcpProtocol,
}},
AddressType: discovery.AddressTypeIPv4,
Endpoints: []discovery.Endpoint{{
Addresses: []string{"10.0.1.1"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
}, {
Addresses: []string{"10.0.1.2"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
}, {
Addresses: []string{"10.0.1.3"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
}, { // not ready endpoints should be ignored
Addresses: []string{"10.0.1.4"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(false)},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
}},
}
fp.OnEndpointSliceAdd(endpointSlice)
result := fp.endpointsMap.Update(fp.endpointsChanges)
if len(result.HCEndpointsLocalIPSize) != 1 {
t.Errorf("unexpected number of health check node ports, expected 1 but got: %d", len(result.HCEndpointsLocalIPSize))
}
// set all endpoints to terminating
endpointSliceTerminating := &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-1", serviceName),
Namespace: namespaceName,
Labels: map[string]string{discovery.LabelServiceName: serviceName},
},
Ports: []discovery.EndpointPort{{
Name: utilpointer.StringPtr(""),
Port: utilpointer.Int32Ptr(80),
Protocol: &tcpProtocol,
}},
AddressType: discovery.AddressTypeIPv4,
Endpoints: []discovery.Endpoint{{
Addresses: []string{"10.0.1.1"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(false),
Serving: utilpointer.BoolPtr(true),
Terminating: utilpointer.BoolPtr(false),
},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
}, {
Addresses: []string{"10.0.1.2"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(false),
Serving: utilpointer.BoolPtr(true),
Terminating: utilpointer.BoolPtr(true),
},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
}, {
Addresses: []string{"10.0.1.3"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(false),
Serving: utilpointer.BoolPtr(true),
Terminating: utilpointer.BoolPtr(true),
},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
}, { // not ready endpoints should be ignored
Addresses: []string{"10.0.1.4"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(false),
Serving: utilpointer.BoolPtr(false),
Terminating: utilpointer.BoolPtr(true),
},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
}},
}
fp.OnEndpointSliceUpdate(endpointSlice, endpointSliceTerminating)
result = fp.endpointsMap.Update(fp.endpointsChanges)
if len(result.HCEndpointsLocalIPSize) != 0 {
t.Errorf("unexpected number of health check node ports, expected 0 but got: %d", len(result.HCEndpointsLocalIPSize))
}
}
func TestFilterCIDRs(t *testing.T) {
var cidrList []string
var cidrs []string
@ -4653,3 +4768,523 @@ func TestTestInternalTrafficPolicyE2E(t *testing.T) {
assert.Len(t, realServers2, 0, "Expected 0 real servers")
}
}
// Test_EndpointSliceReadyAndTerminatingLocal tests that when there are local ready and ready + terminating
// endpoints, only the ready endpoints are used.
func Test_EndpointSliceReadyAndTerminatingLocal(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ProxyTerminatingEndpoints, true)()
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, true, v1.IPv4Protocol)
fp.servicesSynced = true
fp.endpointsSynced = true
fp.endpointSlicesSynced = true
localInternalTrafficPolicy := v1.ServiceInternalTrafficPolicyLocal
serviceName := "svc1"
// Add initial service
namespaceName := "ns1"
fp.OnServiceAdd(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName},
Spec: v1.ServiceSpec{
ClusterIP: "172.20.1.1",
Selector: map[string]string{"foo": "bar"},
Type: v1.ServiceTypeNodePort,
ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeLocal,
InternalTrafficPolicy: &localInternalTrafficPolicy,
ExternalIPs: []string{
"1.2.3.4",
},
Ports: []v1.ServicePort{
{
Name: "",
Port: 80,
TargetPort: intstr.FromInt(80),
Protocol: v1.ProtocolTCP,
},
},
},
})
// Add initial endpoint slice
tcpProtocol := v1.ProtocolTCP
endpointSlice := &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-1", serviceName),
Namespace: namespaceName,
Labels: map[string]string{discovery.LabelServiceName: serviceName},
},
Ports: []discovery.EndpointPort{{
Name: utilpointer.StringPtr(""),
Port: utilpointer.Int32Ptr(80),
Protocol: &tcpProtocol,
}},
AddressType: discovery.AddressTypeIPv4,
Endpoints: []discovery.Endpoint{
{
Addresses: []string{"10.0.1.1"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(true),
Serving: utilpointer.BoolPtr(true),
Terminating: utilpointer.BoolPtr(false),
},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
},
{
Addresses: []string{"10.0.1.2"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(true),
Serving: utilpointer.BoolPtr(true),
Terminating: utilpointer.BoolPtr(false),
},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
},
{
Addresses: []string{"10.0.1.3"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(false),
Serving: utilpointer.BoolPtr(true),
Terminating: utilpointer.BoolPtr(true),
},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
},
{
Addresses: []string{"10.0.1.4"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(false),
Serving: utilpointer.BoolPtr(false),
Terminating: utilpointer.BoolPtr(true),
},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
},
{
Addresses: []string{"10.0.1.5"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(true),
Serving: utilpointer.BoolPtr(true),
Terminating: utilpointer.BoolPtr(false),
},
Topology: map[string]string{"kubernetes.io/hostname": "another-host"},
},
},
}
fp.OnEndpointSliceAdd(endpointSlice)
fp.syncProxyRules()
// Ensure that Proxier updates ipvs appropriately after EndpointSlice update
assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"])
activeEntries1 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries
assert.Equal(t, 4, activeEntries1.Len(), "Expected 3 active entry in KUBE-LOOP-BACK")
assert.Equal(t, true, activeEntries1.Has("10.0.1.1,tcp:80,10.0.1.1"), "Expected activeEntries to reference first (local) pod")
assert.Equal(t, true, activeEntries1.Has("10.0.1.2,tcp:80,10.0.1.2"), "Expected activeEntries to reference second (local) pod")
assert.Equal(t, true, activeEntries1.Has("10.0.1.3,tcp:80,10.0.1.3"), "Expected activeEntries to reference second (local) pod")
assert.Equal(t, true, activeEntries1.Has("10.0.1.4,tcp:80,10.0.1.4"), "Expected activeEntries to reference second (local) pod")
virtualServers, vsErr := ipvs.GetVirtualServers()
assert.Nil(t, vsErr, "Expected no error getting virtual servers")
assert.Len(t, virtualServers, 2, "Expected 2 virtual server")
var clusterIPServer, externalIPServer *utilipvs.VirtualServer
for _, virtualServer := range virtualServers {
if virtualServer.Address.String() == "172.20.1.1" {
clusterIPServer = virtualServer
}
if virtualServer.Address.String() == "1.2.3.4" {
externalIPServer = virtualServer
}
}
// clusterIP should route to cluster-wide ready endpoints
realServers1, rsErr1 := ipvs.GetRealServers(clusterIPServer)
assert.Nil(t, rsErr1, "Expected no error getting real servers")
assert.Len(t, realServers1, 3, "Expected 3 real servers")
assert.Equal(t, realServers1[0].String(), "10.0.1.1:80")
assert.Equal(t, realServers1[1].String(), "10.0.1.2:80")
assert.Equal(t, realServers1[2].String(), "10.0.1.5:80")
// externalIP should route to local ready + non-terminating endpoints if they exist
realServers2, rsErr2 := ipvs.GetRealServers(externalIPServer)
assert.Nil(t, rsErr2, "Expected no error getting real servers")
assert.Len(t, realServers2, 2, "Expected 2 real servers")
assert.Equal(t, realServers2[0].String(), "10.0.1.1:80")
assert.Equal(t, realServers2[1].String(), "10.0.1.2:80")
fp.OnEndpointSliceDelete(endpointSlice)
fp.syncProxyRules()
// Ensure that Proxier updates ipvs appropriately after EndpointSlice delete
assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"])
activeEntries2 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries
assert.Equal(t, 0, activeEntries2.Len(), "Expected 0 active entries in KUBE-LOOP-BACK")
virtualServers, vsErr = ipvs.GetVirtualServers()
assert.Nil(t, vsErr, "Expected no error getting virtual servers")
assert.Len(t, virtualServers, 2, "Expected 1 virtual server")
for _, virtualServer := range virtualServers {
if virtualServer.Address.String() == "172.20.1.1" {
clusterIPServer = virtualServer
}
if virtualServer.Address.String() == "1.2.3.4" {
externalIPServer = virtualServer
}
}
realServers1, rsErr1 = ipvs.GetRealServers(clusterIPServer)
assert.Nil(t, rsErr1, "Expected no error getting real servers")
assert.Len(t, realServers1, 0, "Expected 0 real servers")
realServers2, rsErr2 = ipvs.GetRealServers(externalIPServer)
assert.Nil(t, rsErr2, "Expected no error getting real servers")
assert.Len(t, realServers2, 0, "Expected 0 real servers")
}
// Test_EndpointSliceOnlyReadyTerminatingLocal tests that when there are only local ready terminating
// endpoints, we fall back to those endpoints.
func Test_EndpointSliceOnlyReadyAndTerminatingLocal(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ProxyTerminatingEndpoints, true)()
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, true, v1.IPv4Protocol)
fp.servicesSynced = true
fp.endpointsSynced = true
fp.endpointSlicesSynced = true
localInternalTrafficPolicy := v1.ServiceInternalTrafficPolicyLocal
// Add initial service
serviceName := "svc1"
namespaceName := "ns1"
fp.OnServiceAdd(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName},
Spec: v1.ServiceSpec{
ClusterIP: "172.20.1.1",
Selector: map[string]string{"foo": "bar"},
Type: v1.ServiceTypeNodePort,
ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeLocal,
InternalTrafficPolicy: &localInternalTrafficPolicy,
ExternalIPs: []string{
"1.2.3.4",
},
Ports: []v1.ServicePort{
{
Name: "",
Port: 80,
TargetPort: intstr.FromInt(80),
Protocol: v1.ProtocolTCP,
},
},
},
})
// Add initial endpoint slice
tcpProtocol := v1.ProtocolTCP
endpointSlice := &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-1", serviceName),
Namespace: namespaceName,
Labels: map[string]string{discovery.LabelServiceName: serviceName},
},
Ports: []discovery.EndpointPort{{
Name: utilpointer.StringPtr(""),
Port: utilpointer.Int32Ptr(80),
Protocol: &tcpProtocol,
}},
AddressType: discovery.AddressTypeIPv4,
Endpoints: []discovery.Endpoint{
{
Addresses: []string{"10.0.1.1"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(false),
Serving: utilpointer.BoolPtr(true),
Terminating: utilpointer.BoolPtr(true),
},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
},
{
Addresses: []string{"10.0.1.2"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(false),
Serving: utilpointer.BoolPtr(true),
Terminating: utilpointer.BoolPtr(true),
},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
},
{
Addresses: []string{"10.0.1.3"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(false),
Serving: utilpointer.BoolPtr(false),
Terminating: utilpointer.BoolPtr(true),
},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
},
{
Addresses: []string{"10.0.1.4"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(false),
Serving: utilpointer.BoolPtr(true),
Terminating: utilpointer.BoolPtr(true),
},
Topology: map[string]string{"kubernetes.io/hostname": "another-host"},
},
{
Addresses: []string{"10.0.1.5"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(true),
Serving: utilpointer.BoolPtr(true),
Terminating: utilpointer.BoolPtr(false),
},
Topology: map[string]string{"kubernetes.io/hostname": "another-host"},
},
},
}
fp.OnEndpointSliceAdd(endpointSlice)
fp.syncProxyRules()
// Ensure that Proxier updates ipvs appropriately after EndpointSlice update
assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"])
activeEntries1 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries
assert.Equal(t, 3, activeEntries1.Len(), "Expected 3 active entry in KUBE-LOOP-BACK")
assert.Equal(t, true, activeEntries1.Has("10.0.1.1,tcp:80,10.0.1.1"), "Expected activeEntries to reference first (local) pod")
assert.Equal(t, true, activeEntries1.Has("10.0.1.2,tcp:80,10.0.1.2"), "Expected activeEntries to reference second (local) pod")
assert.Equal(t, true, activeEntries1.Has("10.0.1.3,tcp:80,10.0.1.3"), "Expected activeEntries to reference second (local) pod")
virtualServers, vsErr := ipvs.GetVirtualServers()
assert.Nil(t, vsErr, "Expected no error getting virtual servers")
assert.Len(t, virtualServers, 2, "Expected 2 virtual server")
var clusterIPServer, externalIPServer *utilipvs.VirtualServer
for _, virtualServer := range virtualServers {
if virtualServer.Address.String() == "172.20.1.1" {
clusterIPServer = virtualServer
}
if virtualServer.Address.String() == "1.2.3.4" {
externalIPServer = virtualServer
}
}
// clusterIP should route to cluster-wide Ready endpoints
realServers1, rsErr1 := ipvs.GetRealServers(clusterIPServer)
assert.Nil(t, rsErr1, "Expected no error getting real servers")
assert.Len(t, realServers1, 1, "Expected 1 real servers")
assert.Equal(t, realServers1[0].String(), "10.0.1.5:80")
// externalIP should fall back to local ready + terminating endpoints
realServers2, rsErr2 := ipvs.GetRealServers(externalIPServer)
assert.Nil(t, rsErr2, "Expected no error getting real servers")
assert.Len(t, realServers2, 2, "Expected 2 real servers")
assert.Equal(t, realServers2[0].String(), "10.0.1.1:80")
assert.Equal(t, realServers2[1].String(), "10.0.1.2:80")
fp.OnEndpointSliceDelete(endpointSlice)
fp.syncProxyRules()
// Ensure that Proxier updates ipvs appropriately after EndpointSlice delete
assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"])
activeEntries2 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries
assert.Equal(t, 0, activeEntries2.Len(), "Expected 0 active entries in KUBE-LOOP-BACK")
virtualServers, vsErr = ipvs.GetVirtualServers()
assert.Nil(t, vsErr, "Expected no error getting virtual servers")
assert.Len(t, virtualServers, 2, "Expected 1 virtual server")
for _, virtualServer := range virtualServers {
if virtualServer.Address.String() == "172.20.1.1" {
clusterIPServer = virtualServer
}
if virtualServer.Address.String() == "1.2.3.4" {
externalIPServer = virtualServer
}
}
realServers1, rsErr1 = ipvs.GetRealServers(clusterIPServer)
assert.Nil(t, rsErr1, "Expected no error getting real servers")
assert.Len(t, realServers1, 0, "Expected 0 real servers")
realServers2, rsErr2 = ipvs.GetRealServers(externalIPServer)
assert.Nil(t, rsErr2, "Expected no error getting real servers")
assert.Len(t, realServers2, 0, "Expected 0 real servers")
}
// Test_EndpointSliceOnlyReadyTerminatingLocalWithFeatureGateDisabled tests that when there are only local ready terminating
// endpoints, we fall back to those endpoints.
func Test_EndpointSliceOnlyReadyAndTerminatingLocalWithFeatureGateDisabled(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ProxyTerminatingEndpoints, false)()
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, true, v1.IPv4Protocol)
fp.servicesSynced = true
fp.endpointsSynced = true
fp.endpointSlicesSynced = true
localInternalTrafficPolicy := v1.ServiceInternalTrafficPolicyLocal
// Add initial service
serviceName := "svc1"
namespaceName := "ns1"
fp.OnServiceAdd(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName},
Spec: v1.ServiceSpec{
ClusterIP: "172.20.1.1",
Selector: map[string]string{"foo": "bar"},
Type: v1.ServiceTypeNodePort,
ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeLocal,
InternalTrafficPolicy: &localInternalTrafficPolicy,
ExternalIPs: []string{
"1.2.3.4",
},
Ports: []v1.ServicePort{
{
Name: "",
Port: 80,
TargetPort: intstr.FromInt(80),
Protocol: v1.ProtocolTCP,
},
},
},
})
// Add initial endpoint slice
tcpProtocol := v1.ProtocolTCP
endpointSlice := &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-1", serviceName),
Namespace: namespaceName,
Labels: map[string]string{discovery.LabelServiceName: serviceName},
},
Ports: []discovery.EndpointPort{{
Name: utilpointer.StringPtr(""),
Port: utilpointer.Int32Ptr(80),
Protocol: &tcpProtocol,
}},
AddressType: discovery.AddressTypeIPv4,
Endpoints: []discovery.Endpoint{
{
Addresses: []string{"10.0.1.1"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(false),
Serving: utilpointer.BoolPtr(true),
Terminating: utilpointer.BoolPtr(true),
},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
},
{
Addresses: []string{"10.0.1.2"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(false),
Serving: utilpointer.BoolPtr(true),
Terminating: utilpointer.BoolPtr(true),
},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
},
{
Addresses: []string{"10.0.1.3"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(false),
Serving: utilpointer.BoolPtr(false),
Terminating: utilpointer.BoolPtr(true),
},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
},
{
Addresses: []string{"10.0.1.4"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(false),
Serving: utilpointer.BoolPtr(true),
Terminating: utilpointer.BoolPtr(true),
},
Topology: map[string]string{"kubernetes.io/hostname": "another-host"},
},
{
Addresses: []string{"10.0.1.5"},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(true),
Serving: utilpointer.BoolPtr(true),
Terminating: utilpointer.BoolPtr(false),
},
Topology: map[string]string{"kubernetes.io/hostname": "another-host"},
},
},
}
fp.OnEndpointSliceAdd(endpointSlice)
fp.syncProxyRules()
// Ensure that Proxier updates ipvs appropriately after EndpointSlice update
assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"])
activeEntries1 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries
assert.Equal(t, 3, activeEntries1.Len(), "Expected 3 active entry in KUBE-LOOP-BACK")
assert.Equal(t, true, activeEntries1.Has("10.0.1.1,tcp:80,10.0.1.1"), "Expected activeEntries to reference first (local) pod")
assert.Equal(t, true, activeEntries1.Has("10.0.1.2,tcp:80,10.0.1.2"), "Expected activeEntries to reference second (local) pod")
assert.Equal(t, true, activeEntries1.Has("10.0.1.3,tcp:80,10.0.1.3"), "Expected activeEntries to reference second (local) pod")
virtualServers, vsErr := ipvs.GetVirtualServers()
assert.Nil(t, vsErr, "Expected no error getting virtual servers")
assert.Len(t, virtualServers, 2, "Expected 2 virtual server")
var clusterIPServer, externalIPServer *utilipvs.VirtualServer
for _, virtualServer := range virtualServers {
if virtualServer.Address.String() == "172.20.1.1" {
clusterIPServer = virtualServer
}
if virtualServer.Address.String() == "1.2.3.4" {
externalIPServer = virtualServer
}
}
// clusterIP should route to cluster-wide Ready endpoints
realServers1, rsErr1 := ipvs.GetRealServers(clusterIPServer)
assert.Nil(t, rsErr1, "Expected no error getting real servers")
assert.Len(t, realServers1, 1, "Expected 1 real servers")
assert.Equal(t, realServers1[0].String(), "10.0.1.5:80")
// externalIP should have 0 endpoints since the feature gate is disabled.
realServers2, rsErr2 := ipvs.GetRealServers(externalIPServer)
assert.Nil(t, rsErr2, "Expected no error getting real servers")
assert.Len(t, realServers2, 0, "Expected 0 real servers")
fp.OnEndpointSliceDelete(endpointSlice)
fp.syncProxyRules()
// Ensure that Proxier updates ipvs appropriately after EndpointSlice delete
assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"])
activeEntries2 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries
assert.Equal(t, 0, activeEntries2.Len(), "Expected 0 active entries in KUBE-LOOP-BACK")
virtualServers, vsErr = ipvs.GetVirtualServers()
assert.Nil(t, vsErr, "Expected no error getting virtual servers")
assert.Len(t, virtualServers, 2, "Expected 1 virtual server")
for _, virtualServer := range virtualServers {
if virtualServer.Address.String() == "172.20.1.1" {
clusterIPServer = virtualServer
}
if virtualServer.Address.String() == "1.2.3.4" {
externalIPServer = virtualServer
}
}
realServers1, rsErr1 = ipvs.GetRealServers(clusterIPServer)
assert.Nil(t, rsErr1, "Expected no error getting real servers")
assert.Len(t, realServers1, 0, "Expected 0 real servers")
realServers2, rsErr2 = ipvs.GetRealServers(externalIPServer)
assert.Nil(t, rsErr2, "Expected no error getting real servers")
assert.Len(t, realServers2, 0, "Expected 0 real servers")
}

View File

@ -32,7 +32,7 @@ func FilterEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeLabels map[s
}
if utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) && svcInfo.NodeLocalInternal() {
return filterEndpointsInternalTrafficPolicy(svcInfo.InternalTrafficPolicy(), endpoints)
return FilterLocalEndpoint(endpoints)
}
if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) {
@ -85,20 +85,8 @@ func filterEndpointsWithHints(endpoints []Endpoint, hintsAnnotation string, node
return filteredEndpoints
}
// filterEndpointsInternalTrafficPolicy returns the node local endpoints based
// on configured InternalTrafficPolicy.
//
// If ServiceInternalTrafficPolicy feature gate is off, returns the original
// EndpointSlice.
// Otherwise, if InternalTrafficPolicy is Local, only return the node local endpoints.
func filterEndpointsInternalTrafficPolicy(internalTrafficPolicy *v1.ServiceInternalTrafficPolicyType, endpoints []Endpoint) []Endpoint {
if !utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) {
return endpoints
}
if internalTrafficPolicy == nil || *internalTrafficPolicy == v1.ServiceInternalTrafficPolicyCluster {
return endpoints
}
// FilterLocalEndpoint returns the node local endpoints
func FilterLocalEndpoint(endpoints []Endpoint) []Endpoint {
var filteredEndpoints []Endpoint
// Get all the local endpoints
@ -108,7 +96,5 @@ func filterEndpointsInternalTrafficPolicy(internalTrafficPolicy *v1.ServiceInter
}
}
// When internalTrafficPolicy is Local, only return the node local
// endpoints
return filteredEndpoints
}

View File

@ -335,93 +335,50 @@ func Test_filterEndpointsWithHints(t *testing.T) {
}
}
func Test_filterEndpointsInternalTrafficPolicy(t *testing.T) {
cluster := v1.ServiceInternalTrafficPolicyCluster
local := v1.ServiceInternalTrafficPolicyLocal
func TestFilterLocalEndpoint(t *testing.T) {
testCases := []struct {
name string
internalTrafficPolicy *v1.ServiceInternalTrafficPolicyType
endpoints []Endpoint
expected []Endpoint
featureGateOn bool
name string
endpoints []Endpoint
expected []Endpoint
}{
{
name: "no internalTrafficPolicy with empty endpoints",
internalTrafficPolicy: nil,
endpoints: []Endpoint{},
expected: []Endpoint{},
featureGateOn: true,
name: "with empty endpoints",
endpoints: []Endpoint{},
expected: nil,
},
{
name: "no internalTrafficPolicy with non-empty endpoints",
internalTrafficPolicy: nil,
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", IsLocal: false},
},
expected: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", IsLocal: false},
},
featureGateOn: true,
},
{
name: "internalTrafficPolicy is cluster",
internalTrafficPolicy: &cluster,
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", IsLocal: false},
},
expected: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", IsLocal: false},
},
featureGateOn: true,
},
{
name: "internalTrafficPolicy is local with non-zero local endpoints",
internalTrafficPolicy: &local,
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", IsLocal: false},
},
expected: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", IsLocal: true},
},
featureGateOn: true,
},
{
name: "internalTrafficPolicy is local with zero local endpoints",
internalTrafficPolicy: &local,
name: "all endpoints not local",
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", IsLocal: false},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", IsLocal: false},
&BaseEndpointInfo{Endpoint: "10.0.0.2:80", IsLocal: false},
},
expected: nil,
featureGateOn: true,
expected: nil,
},
{
name: "feature gate is off, internalTrafficPolicy is local with non-empty endpoints",
internalTrafficPolicy: &local,
name: "all endpoints are local",
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", IsLocal: false},
&BaseEndpointInfo{Endpoint: "10.0.0.2:80", IsLocal: false},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", IsLocal: true},
},
expected: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", IsLocal: true},
},
},
{
name: "some endpoints are local",
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", IsLocal: false},
},
expected: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", IsLocal: false},
&BaseEndpointInfo{Endpoint: "10.0.0.2:80", IsLocal: false},
},
featureGateOn: false,
},
}
for _, tc := range testCases {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceInternalTrafficPolicy, tc.featureGateOn)()
t.Run(tc.name, func(t *testing.T) {
filteredEndpoint := filterEndpointsInternalTrafficPolicy(tc.internalTrafficPolicy, tc.endpoints)
filteredEndpoint := FilterLocalEndpoint(tc.endpoints)
if !reflect.DeepEqual(filteredEndpoint, tc.expected) {
t.Errorf("expected %v, got %v", tc.expected, filteredEndpoint)
}