mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
Merge pull request #108691 from andrewsykim/proxy-terminating-endpoints
Apply ProxyTerminatingEndpoints to all traffic policies
This commit is contained in:
commit
57a739bdf2
@ -4558,9 +4558,9 @@ COMMIT
|
||||
}
|
||||
}
|
||||
|
||||
// Test_EndpointSliceWithTerminatingEndpoints tests that when there are local ready and ready + terminating
|
||||
// Test_EndpointSliceWithTerminatingEndpointsTrafficPolicyLocal tests that when there are local ready and ready + terminating
|
||||
// endpoints, only the ready endpoints are used.
|
||||
func Test_EndpointSliceWithTerminatingEndpoints(t *testing.T) {
|
||||
func Test_EndpointSliceWithTerminatingEndpointsTrafficPolicyLocal(t *testing.T) {
|
||||
tcpProtocol := v1.ProtocolTCP
|
||||
timeout := v1.DefaultClientIPServiceAffinitySeconds
|
||||
service := &v1.Service{
|
||||
@ -5106,6 +5106,7 @@ COMMIT
|
||||
:KUBE-SERVICES - [0:0]
|
||||
:KUBE-NODEPORTS - [0:0]
|
||||
:KUBE-POSTROUTING - [0:0]
|
||||
:KUBE-SEP-EQCHZ7S2PJ72OHAY - [0:0]
|
||||
:KUBE-MARK-MASQ - [0:0]
|
||||
:KUBE-SVC-AQI2S6QIMU7PVVRP - [0:0]
|
||||
:KUBE-SVL-AQI2S6QIMU7PVVRP - [0:0]
|
||||
@ -5114,8 +5115,12 @@ COMMIT
|
||||
-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN
|
||||
-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000
|
||||
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
|
||||
-A KUBE-SEP-EQCHZ7S2PJ72OHAY -m comment --comment ns1/svc1 -s 10.0.1.5 -j KUBE-MARK-MASQ
|
||||
-A KUBE-SEP-EQCHZ7S2PJ72OHAY -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-EQCHZ7S2PJ72OHAY --set -m tcp -p tcp -j DNAT --to-destination 10.0.1.5:80
|
||||
-A KUBE-MARK-MASQ -j MARK --or-mark 0x4000
|
||||
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
|
||||
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-EQCHZ7S2PJ72OHAY --rcheck --seconds 10800 --reap -j KUBE-SEP-EQCHZ7S2PJ72OHAY
|
||||
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-EQCHZ7S2PJ72OHAY
|
||||
-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 -j KUBE-SVC-AQI2S6QIMU7PVVRP
|
||||
-A KUBE-SERVICES -m comment --comment "ns1/svc1 loadbalancer IP" -m tcp -p tcp -d 1.2.3.4 --dport 80 -j KUBE-FW-AQI2S6QIMU7PVVRP
|
||||
-A KUBE-FW-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 loadbalancer IP" -j KUBE-XLB-AQI2S6QIMU7PVVRP
|
||||
@ -5224,6 +5229,622 @@ COMMIT
|
||||
}
|
||||
}
|
||||
|
||||
// Test_EndpointSliceWithTerminatingEndpointsTrafficPolicyCluster tests that when there are cluster-wide ready and ready + terminating
|
||||
// endpoints, only the ready endpoints are used.
|
||||
func Test_EndpointSliceWithTerminatingEndpointsTrafficPolicyCluster(t *testing.T) {
|
||||
tcpProtocol := v1.ProtocolTCP
|
||||
timeout := v1.DefaultClientIPServiceAffinitySeconds
|
||||
service := &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "svc1", Namespace: "ns1"},
|
||||
Spec: v1.ServiceSpec{
|
||||
ClusterIP: "172.30.1.1",
|
||||
Type: v1.ServiceTypeLoadBalancer,
|
||||
ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeCluster,
|
||||
Selector: map[string]string{"foo": "bar"},
|
||||
Ports: []v1.ServicePort{
|
||||
{
|
||||
Name: "",
|
||||
TargetPort: intstr.FromInt(80),
|
||||
Port: 80,
|
||||
Protocol: v1.ProtocolTCP,
|
||||
},
|
||||
},
|
||||
HealthCheckNodePort: 30000,
|
||||
SessionAffinity: v1.ServiceAffinityClientIP,
|
||||
SessionAffinityConfig: &v1.SessionAffinityConfig{
|
||||
ClientIP: &v1.ClientIPConfig{
|
||||
TimeoutSeconds: &timeout,
|
||||
},
|
||||
},
|
||||
},
|
||||
Status: v1.ServiceStatus{
|
||||
LoadBalancer: v1.LoadBalancerStatus{
|
||||
Ingress: []v1.LoadBalancerIngress{
|
||||
{IP: "1.2.3.4"},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
testcases := []struct {
|
||||
name string
|
||||
terminatingFeatureGate bool
|
||||
endpointslice *discovery.EndpointSlice
|
||||
expectedIPTables string
|
||||
noUsableEndpoints bool
|
||||
}{
|
||||
{
|
||||
name: "feature gate ProxyTerminatingEndpoints enabled, ready endpoints exist",
|
||||
terminatingFeatureGate: true,
|
||||
endpointslice: &discovery.EndpointSlice{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("%s-1", "svc1"),
|
||||
Namespace: "ns1",
|
||||
Labels: map[string]string{discovery.LabelServiceName: "svc1"},
|
||||
},
|
||||
Ports: []discovery.EndpointPort{{
|
||||
Name: utilpointer.StringPtr(""),
|
||||
Port: utilpointer.Int32Ptr(80),
|
||||
Protocol: &tcpProtocol,
|
||||
}},
|
||||
AddressType: discovery.AddressTypeIPv4,
|
||||
Endpoints: []discovery.Endpoint{
|
||||
{
|
||||
Addresses: []string{"10.0.1.1"},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(true),
|
||||
Serving: utilpointer.BoolPtr(true),
|
||||
Terminating: utilpointer.BoolPtr(false),
|
||||
},
|
||||
NodeName: utilpointer.StringPtr(testHostname),
|
||||
},
|
||||
{
|
||||
Addresses: []string{"10.0.1.2"},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(true),
|
||||
Serving: utilpointer.BoolPtr(true),
|
||||
Terminating: utilpointer.BoolPtr(false),
|
||||
},
|
||||
NodeName: utilpointer.StringPtr(testHostname),
|
||||
},
|
||||
{
|
||||
// this endpoint should be ignored since there are ready non-terminating endpoints
|
||||
Addresses: []string{"10.0.1.3"},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(false),
|
||||
Serving: utilpointer.BoolPtr(true),
|
||||
Terminating: utilpointer.BoolPtr(true),
|
||||
},
|
||||
NodeName: utilpointer.StringPtr("another-host"),
|
||||
},
|
||||
{
|
||||
// this endpoint should be ignored since it is not "serving"
|
||||
Addresses: []string{"10.0.1.4"},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(false),
|
||||
Serving: utilpointer.BoolPtr(false),
|
||||
Terminating: utilpointer.BoolPtr(true),
|
||||
},
|
||||
NodeName: utilpointer.StringPtr("another-host"),
|
||||
},
|
||||
{
|
||||
Addresses: []string{"10.0.1.5"},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(true),
|
||||
Serving: utilpointer.BoolPtr(true),
|
||||
Terminating: utilpointer.BoolPtr(false),
|
||||
},
|
||||
NodeName: utilpointer.StringPtr("another-host"),
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedIPTables: `
|
||||
*filter
|
||||
:KUBE-SERVICES - [0:0]
|
||||
:KUBE-EXTERNAL-SERVICES - [0:0]
|
||||
:KUBE-FORWARD - [0:0]
|
||||
:KUBE-NODEPORTS - [0:0]
|
||||
-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP
|
||||
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
|
||||
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
|
||||
COMMIT
|
||||
*nat
|
||||
:KUBE-SERVICES - [0:0]
|
||||
:KUBE-NODEPORTS - [0:0]
|
||||
:KUBE-POSTROUTING - [0:0]
|
||||
:KUBE-MARK-MASQ - [0:0]
|
||||
:KUBE-SVC-AQI2S6QIMU7PVVRP - [0:0]
|
||||
:KUBE-FW-AQI2S6QIMU7PVVRP - [0:0]
|
||||
:KUBE-SEP-3JOIVZTXZZRGORX4 - [0:0]
|
||||
:KUBE-SEP-IO5XOSKPAXIFQXAJ - [0:0]
|
||||
:KUBE-SEP-EQCHZ7S2PJ72OHAY - [0:0]
|
||||
-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN
|
||||
-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000
|
||||
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
|
||||
-A KUBE-MARK-MASQ -j MARK --or-mark 0x4000
|
||||
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
|
||||
-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 -j KUBE-SVC-AQI2S6QIMU7PVVRP
|
||||
-A KUBE-SERVICES -m comment --comment "ns1/svc1 loadbalancer IP" -m tcp -p tcp -d 1.2.3.4 --dport 80 -j KUBE-FW-AQI2S6QIMU7PVVRP
|
||||
-A KUBE-FW-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 loadbalancer IP" -j KUBE-MARK-MASQ
|
||||
-A KUBE-FW-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 loadbalancer IP" -j KUBE-SVC-AQI2S6QIMU7PVVRP
|
||||
-A KUBE-FW-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 loadbalancer IP" -j KUBE-MARK-DROP
|
||||
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-3JOIVZTXZZRGORX4 --rcheck --seconds 10800 --reap -j KUBE-SEP-3JOIVZTXZZRGORX4
|
||||
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-IO5XOSKPAXIFQXAJ --rcheck --seconds 10800 --reap -j KUBE-SEP-IO5XOSKPAXIFQXAJ
|
||||
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-EQCHZ7S2PJ72OHAY --rcheck --seconds 10800 --reap -j KUBE-SEP-EQCHZ7S2PJ72OHAY
|
||||
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.3333333333 -j KUBE-SEP-3JOIVZTXZZRGORX4
|
||||
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-IO5XOSKPAXIFQXAJ
|
||||
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-EQCHZ7S2PJ72OHAY
|
||||
-A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -s 10.0.1.1 -j KUBE-MARK-MASQ
|
||||
-A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-3JOIVZTXZZRGORX4 --set -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80
|
||||
-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -s 10.0.1.2 -j KUBE-MARK-MASQ
|
||||
-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-IO5XOSKPAXIFQXAJ --set -m tcp -p tcp -j DNAT --to-destination 10.0.1.2:80
|
||||
-A KUBE-SEP-EQCHZ7S2PJ72OHAY -m comment --comment ns1/svc1 -s 10.0.1.5 -j KUBE-MARK-MASQ
|
||||
-A KUBE-SEP-EQCHZ7S2PJ72OHAY -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-EQCHZ7S2PJ72OHAY --set -m tcp -p tcp -j DNAT --to-destination 10.0.1.5:80
|
||||
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
|
||||
COMMIT
|
||||
`,
|
||||
},
|
||||
{
|
||||
name: "feature gate ProxyTerminatingEndpoints disabled, ready endpoints exist",
|
||||
terminatingFeatureGate: false,
|
||||
endpointslice: &discovery.EndpointSlice{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("%s-1", "svc1"),
|
||||
Namespace: "ns1",
|
||||
Labels: map[string]string{discovery.LabelServiceName: "svc1"},
|
||||
},
|
||||
Ports: []discovery.EndpointPort{{
|
||||
Name: utilpointer.StringPtr(""),
|
||||
Port: utilpointer.Int32Ptr(80),
|
||||
Protocol: &tcpProtocol,
|
||||
}},
|
||||
AddressType: discovery.AddressTypeIPv4,
|
||||
Endpoints: []discovery.Endpoint{
|
||||
{
|
||||
Addresses: []string{"10.0.1.1"},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(true),
|
||||
Serving: utilpointer.BoolPtr(true),
|
||||
Terminating: utilpointer.BoolPtr(false),
|
||||
},
|
||||
NodeName: utilpointer.StringPtr(testHostname),
|
||||
},
|
||||
{
|
||||
Addresses: []string{"10.0.1.2"},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(true),
|
||||
Serving: utilpointer.BoolPtr(true),
|
||||
Terminating: utilpointer.BoolPtr(false),
|
||||
},
|
||||
NodeName: utilpointer.StringPtr(testHostname),
|
||||
},
|
||||
{
|
||||
// always ignored since feature gate is disabled
|
||||
Addresses: []string{"10.0.1.3"},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(false),
|
||||
Serving: utilpointer.BoolPtr(true),
|
||||
Terminating: utilpointer.BoolPtr(true),
|
||||
},
|
||||
NodeName: utilpointer.StringPtr("another-host"),
|
||||
},
|
||||
{
|
||||
// always ignored since serving=false
|
||||
Addresses: []string{"10.0.1.4"},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(false),
|
||||
Serving: utilpointer.BoolPtr(false),
|
||||
Terminating: utilpointer.BoolPtr(true),
|
||||
},
|
||||
NodeName: utilpointer.StringPtr("another-host"),
|
||||
},
|
||||
{
|
||||
Addresses: []string{"10.0.1.5"},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(true),
|
||||
Serving: utilpointer.BoolPtr(true),
|
||||
Terminating: utilpointer.BoolPtr(false),
|
||||
},
|
||||
NodeName: utilpointer.StringPtr("another-host"),
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedIPTables: `
|
||||
*filter
|
||||
:KUBE-SERVICES - [0:0]
|
||||
:KUBE-EXTERNAL-SERVICES - [0:0]
|
||||
:KUBE-FORWARD - [0:0]
|
||||
:KUBE-NODEPORTS - [0:0]
|
||||
-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP
|
||||
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
|
||||
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
|
||||
COMMIT
|
||||
*nat
|
||||
:KUBE-SERVICES - [0:0]
|
||||
:KUBE-NODEPORTS - [0:0]
|
||||
:KUBE-POSTROUTING - [0:0]
|
||||
:KUBE-MARK-MASQ - [0:0]
|
||||
:KUBE-SVC-AQI2S6QIMU7PVVRP - [0:0]
|
||||
:KUBE-FW-AQI2S6QIMU7PVVRP - [0:0]
|
||||
:KUBE-SEP-3JOIVZTXZZRGORX4 - [0:0]
|
||||
:KUBE-SEP-IO5XOSKPAXIFQXAJ - [0:0]
|
||||
:KUBE-SEP-EQCHZ7S2PJ72OHAY - [0:0]
|
||||
-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN
|
||||
-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000
|
||||
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
|
||||
-A KUBE-MARK-MASQ -j MARK --or-mark 0x4000
|
||||
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
|
||||
-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 -j KUBE-SVC-AQI2S6QIMU7PVVRP
|
||||
-A KUBE-SERVICES -m comment --comment "ns1/svc1 loadbalancer IP" -m tcp -p tcp -d 1.2.3.4 --dport 80 -j KUBE-FW-AQI2S6QIMU7PVVRP
|
||||
-A KUBE-FW-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 loadbalancer IP" -j KUBE-MARK-MASQ
|
||||
-A KUBE-FW-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 loadbalancer IP" -j KUBE-SVC-AQI2S6QIMU7PVVRP
|
||||
-A KUBE-FW-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 loadbalancer IP" -j KUBE-MARK-DROP
|
||||
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-3JOIVZTXZZRGORX4 --rcheck --seconds 10800 --reap -j KUBE-SEP-3JOIVZTXZZRGORX4
|
||||
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-IO5XOSKPAXIFQXAJ --rcheck --seconds 10800 --reap -j KUBE-SEP-IO5XOSKPAXIFQXAJ
|
||||
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-EQCHZ7S2PJ72OHAY --rcheck --seconds 10800 --reap -j KUBE-SEP-EQCHZ7S2PJ72OHAY
|
||||
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.3333333333 -j KUBE-SEP-3JOIVZTXZZRGORX4
|
||||
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-IO5XOSKPAXIFQXAJ
|
||||
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-EQCHZ7S2PJ72OHAY
|
||||
-A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -s 10.0.1.1 -j KUBE-MARK-MASQ
|
||||
-A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-3JOIVZTXZZRGORX4 --set -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80
|
||||
-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -s 10.0.1.2 -j KUBE-MARK-MASQ
|
||||
-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-IO5XOSKPAXIFQXAJ --set -m tcp -p tcp -j DNAT --to-destination 10.0.1.2:80
|
||||
-A KUBE-SEP-EQCHZ7S2PJ72OHAY -m comment --comment ns1/svc1 -s 10.0.1.5 -j KUBE-MARK-MASQ
|
||||
-A KUBE-SEP-EQCHZ7S2PJ72OHAY -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-EQCHZ7S2PJ72OHAY --set -m tcp -p tcp -j DNAT --to-destination 10.0.1.5:80
|
||||
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
|
||||
COMMIT
|
||||
`,
|
||||
},
|
||||
{
|
||||
name: "feature gate ProxyTerminatingEndpoints enabled, only terminating endpoints exist",
|
||||
terminatingFeatureGate: true,
|
||||
endpointslice: &discovery.EndpointSlice{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("%s-1", "svc1"),
|
||||
Namespace: "ns1",
|
||||
Labels: map[string]string{discovery.LabelServiceName: "svc1"},
|
||||
},
|
||||
Ports: []discovery.EndpointPort{{
|
||||
Name: utilpointer.StringPtr(""),
|
||||
Port: utilpointer.Int32Ptr(80),
|
||||
Protocol: &tcpProtocol,
|
||||
}},
|
||||
AddressType: discovery.AddressTypeIPv4,
|
||||
Endpoints: []discovery.Endpoint{
|
||||
{
|
||||
// this endpoint should be used since there are only ready terminating endpoints
|
||||
Addresses: []string{"10.0.1.2"},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(false),
|
||||
Serving: utilpointer.BoolPtr(true),
|
||||
Terminating: utilpointer.BoolPtr(true),
|
||||
},
|
||||
NodeName: utilpointer.StringPtr(testHostname),
|
||||
},
|
||||
{
|
||||
// this endpoint should be used since there are only ready terminating endpoints
|
||||
Addresses: []string{"10.0.1.3"},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(false),
|
||||
Serving: utilpointer.BoolPtr(true),
|
||||
Terminating: utilpointer.BoolPtr(true),
|
||||
},
|
||||
NodeName: utilpointer.StringPtr(testHostname),
|
||||
},
|
||||
{
|
||||
// this endpoint should not be used since it is both terminating and not ready.
|
||||
Addresses: []string{"10.0.1.4"},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(false),
|
||||
Serving: utilpointer.BoolPtr(false),
|
||||
Terminating: utilpointer.BoolPtr(true),
|
||||
},
|
||||
NodeName: utilpointer.StringPtr("another-host"),
|
||||
},
|
||||
{
|
||||
// this endpoint should be used since there are only ready terminating endpoints
|
||||
Addresses: []string{"10.0.1.5"},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(false),
|
||||
Serving: utilpointer.BoolPtr(true),
|
||||
Terminating: utilpointer.BoolPtr(true),
|
||||
},
|
||||
NodeName: utilpointer.StringPtr("another-host"),
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedIPTables: `
|
||||
*filter
|
||||
:KUBE-SERVICES - [0:0]
|
||||
:KUBE-EXTERNAL-SERVICES - [0:0]
|
||||
:KUBE-FORWARD - [0:0]
|
||||
:KUBE-NODEPORTS - [0:0]
|
||||
-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP
|
||||
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
|
||||
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
|
||||
COMMIT
|
||||
*nat
|
||||
:KUBE-SERVICES - [0:0]
|
||||
:KUBE-NODEPORTS - [0:0]
|
||||
:KUBE-POSTROUTING - [0:0]
|
||||
:KUBE-MARK-MASQ - [0:0]
|
||||
:KUBE-SVC-AQI2S6QIMU7PVVRP - [0:0]
|
||||
:KUBE-FW-AQI2S6QIMU7PVVRP - [0:0]
|
||||
:KUBE-SEP-EQCHZ7S2PJ72OHAY - [0:0]
|
||||
:KUBE-SEP-IO5XOSKPAXIFQXAJ - [0:0]
|
||||
:KUBE-SEP-XGJFVO3L2O5SRFNT - [0:0]
|
||||
-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN
|
||||
-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000
|
||||
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
|
||||
-A KUBE-MARK-MASQ -j MARK --or-mark 0x4000
|
||||
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
|
||||
-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 -j KUBE-SVC-AQI2S6QIMU7PVVRP
|
||||
-A KUBE-SERVICES -m comment --comment "ns1/svc1 loadbalancer IP" -m tcp -p tcp -d 1.2.3.4 --dport 80 -j KUBE-FW-AQI2S6QIMU7PVVRP
|
||||
-A KUBE-FW-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 loadbalancer IP" -j KUBE-MARK-MASQ
|
||||
-A KUBE-FW-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 loadbalancer IP" -j KUBE-SVC-AQI2S6QIMU7PVVRP
|
||||
-A KUBE-FW-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 loadbalancer IP" -j KUBE-MARK-DROP
|
||||
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-IO5XOSKPAXIFQXAJ --rcheck --seconds 10800 --reap -j KUBE-SEP-IO5XOSKPAXIFQXAJ
|
||||
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-XGJFVO3L2O5SRFNT --rcheck --seconds 10800 --reap -j KUBE-SEP-XGJFVO3L2O5SRFNT
|
||||
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-EQCHZ7S2PJ72OHAY --rcheck --seconds 10800 --reap -j KUBE-SEP-EQCHZ7S2PJ72OHAY
|
||||
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.3333333333 -j KUBE-SEP-IO5XOSKPAXIFQXAJ
|
||||
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-XGJFVO3L2O5SRFNT
|
||||
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-EQCHZ7S2PJ72OHAY
|
||||
-A KUBE-SEP-EQCHZ7S2PJ72OHAY -m comment --comment ns1/svc1 -s 10.0.1.5 -j KUBE-MARK-MASQ
|
||||
-A KUBE-SEP-EQCHZ7S2PJ72OHAY -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-EQCHZ7S2PJ72OHAY --set -m tcp -p tcp -j DNAT --to-destination 10.0.1.5:80
|
||||
-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -s 10.0.1.2 -j KUBE-MARK-MASQ
|
||||
-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-IO5XOSKPAXIFQXAJ --set -m tcp -p tcp -j DNAT --to-destination 10.0.1.2:80
|
||||
-A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -s 10.0.1.3 -j KUBE-MARK-MASQ
|
||||
-A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-XGJFVO3L2O5SRFNT --set -m tcp -p tcp -j DNAT --to-destination 10.0.1.3:80
|
||||
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
|
||||
COMMIT
|
||||
`,
|
||||
},
|
||||
{
|
||||
name: "with ProxyTerminatingEndpoints disabled, only terminating endpoints exist",
|
||||
terminatingFeatureGate: false,
|
||||
endpointslice: &discovery.EndpointSlice{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("%s-1", "svc1"),
|
||||
Namespace: "ns1",
|
||||
Labels: map[string]string{discovery.LabelServiceName: "svc1"},
|
||||
},
|
||||
Ports: []discovery.EndpointPort{{
|
||||
Name: utilpointer.StringPtr(""),
|
||||
Port: utilpointer.Int32Ptr(80),
|
||||
Protocol: &tcpProtocol,
|
||||
}},
|
||||
AddressType: discovery.AddressTypeIPv4,
|
||||
Endpoints: []discovery.Endpoint{
|
||||
{
|
||||
Addresses: []string{"10.0.1.1"},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(false),
|
||||
Serving: utilpointer.BoolPtr(true),
|
||||
Terminating: utilpointer.BoolPtr(true),
|
||||
},
|
||||
NodeName: utilpointer.StringPtr(testHostname),
|
||||
},
|
||||
{
|
||||
Addresses: []string{"10.0.1.2"},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(false),
|
||||
Serving: utilpointer.BoolPtr(true),
|
||||
Terminating: utilpointer.BoolPtr(true),
|
||||
},
|
||||
NodeName: utilpointer.StringPtr(testHostname),
|
||||
},
|
||||
{
|
||||
Addresses: []string{"10.0.1.3"},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(false),
|
||||
Serving: utilpointer.BoolPtr(false),
|
||||
Terminating: utilpointer.BoolPtr(true),
|
||||
},
|
||||
NodeName: utilpointer.StringPtr("another-host"),
|
||||
},
|
||||
{
|
||||
Addresses: []string{"10.0.1.4"},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(false),
|
||||
Serving: utilpointer.BoolPtr(false),
|
||||
Terminating: utilpointer.BoolPtr(true),
|
||||
},
|
||||
NodeName: utilpointer.StringPtr("another-host"),
|
||||
},
|
||||
{
|
||||
Addresses: []string{"10.0.1.5"},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(false),
|
||||
Serving: utilpointer.BoolPtr(true),
|
||||
Terminating: utilpointer.BoolPtr(true),
|
||||
},
|
||||
NodeName: utilpointer.StringPtr("another-host"),
|
||||
},
|
||||
},
|
||||
},
|
||||
noUsableEndpoints: true,
|
||||
expectedIPTables: `
|
||||
*filter
|
||||
:KUBE-SERVICES - [0:0]
|
||||
:KUBE-EXTERNAL-SERVICES - [0:0]
|
||||
:KUBE-FORWARD - [0:0]
|
||||
:KUBE-NODEPORTS - [0:0]
|
||||
-A KUBE-SERVICES -m comment --comment "ns1/svc1 has no endpoints" -m tcp -p tcp -d 172.30.1.1 --dport 80 -j REJECT
|
||||
-A KUBE-EXTERNAL-SERVICES -m comment --comment "ns1/svc1 has no endpoints" -m tcp -p tcp -d 1.2.3.4 --dport 80 -j REJECT
|
||||
-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP
|
||||
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
|
||||
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
|
||||
COMMIT
|
||||
*nat
|
||||
:KUBE-SERVICES - [0:0]
|
||||
:KUBE-NODEPORTS - [0:0]
|
||||
:KUBE-POSTROUTING - [0:0]
|
||||
:KUBE-MARK-MASQ - [0:0]
|
||||
-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN
|
||||
-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000
|
||||
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
|
||||
-A KUBE-MARK-MASQ -j MARK --or-mark 0x4000
|
||||
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
|
||||
COMMIT
|
||||
`,
|
||||
},
|
||||
{
|
||||
name: "ProxyTerminatingEndpoints enabled, terminating endpoints on remote node",
|
||||
terminatingFeatureGate: true,
|
||||
endpointslice: &discovery.EndpointSlice{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("%s-1", "svc1"),
|
||||
Namespace: "ns1",
|
||||
Labels: map[string]string{discovery.LabelServiceName: "svc1"},
|
||||
},
|
||||
Ports: []discovery.EndpointPort{{
|
||||
Name: utilpointer.StringPtr(""),
|
||||
Port: utilpointer.Int32Ptr(80),
|
||||
Protocol: &tcpProtocol,
|
||||
}},
|
||||
AddressType: discovery.AddressTypeIPv4,
|
||||
Endpoints: []discovery.Endpoint{
|
||||
{
|
||||
Addresses: []string{"10.0.1.5"},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(false),
|
||||
Serving: utilpointer.BoolPtr(true),
|
||||
Terminating: utilpointer.BoolPtr(true),
|
||||
},
|
||||
NodeName: utilpointer.StringPtr("host-1"),
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedIPTables: `
|
||||
*filter
|
||||
:KUBE-SERVICES - [0:0]
|
||||
:KUBE-EXTERNAL-SERVICES - [0:0]
|
||||
:KUBE-FORWARD - [0:0]
|
||||
:KUBE-NODEPORTS - [0:0]
|
||||
-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP
|
||||
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
|
||||
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
|
||||
COMMIT
|
||||
*nat
|
||||
:KUBE-SERVICES - [0:0]
|
||||
:KUBE-NODEPORTS - [0:0]
|
||||
:KUBE-POSTROUTING - [0:0]
|
||||
:KUBE-MARK-MASQ - [0:0]
|
||||
:KUBE-SEP-EQCHZ7S2PJ72OHAY - [0:0]
|
||||
:KUBE-SVC-AQI2S6QIMU7PVVRP - [0:0]
|
||||
:KUBE-FW-AQI2S6QIMU7PVVRP - [0:0]
|
||||
-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN
|
||||
-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000
|
||||
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
|
||||
-A KUBE-MARK-MASQ -j MARK --or-mark 0x4000
|
||||
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
|
||||
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-EQCHZ7S2PJ72OHAY --rcheck --seconds 10800 --reap -j KUBE-SEP-EQCHZ7S2PJ72OHAY
|
||||
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-EQCHZ7S2PJ72OHAY
|
||||
-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 -j KUBE-SVC-AQI2S6QIMU7PVVRP
|
||||
-A KUBE-SERVICES -m comment --comment "ns1/svc1 loadbalancer IP" -m tcp -p tcp -d 1.2.3.4 --dport 80 -j KUBE-FW-AQI2S6QIMU7PVVRP
|
||||
-A KUBE-SEP-EQCHZ7S2PJ72OHAY -m comment --comment ns1/svc1 -s 10.0.1.5 -j KUBE-MARK-MASQ
|
||||
-A KUBE-SEP-EQCHZ7S2PJ72OHAY -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-EQCHZ7S2PJ72OHAY --set -m tcp -p tcp -j DNAT --to-destination 10.0.1.5:80
|
||||
-A KUBE-FW-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 loadbalancer IP" -j KUBE-MARK-MASQ
|
||||
-A KUBE-FW-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 loadbalancer IP" -j KUBE-SVC-AQI2S6QIMU7PVVRP
|
||||
-A KUBE-FW-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 loadbalancer IP" -j KUBE-MARK-DROP
|
||||
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
|
||||
COMMIT
|
||||
`,
|
||||
},
|
||||
{
|
||||
name: "no usable endpoints on any node",
|
||||
terminatingFeatureGate: true,
|
||||
endpointslice: &discovery.EndpointSlice{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("%s-1", "svc1"),
|
||||
Namespace: "ns1",
|
||||
Labels: map[string]string{discovery.LabelServiceName: "svc1"},
|
||||
},
|
||||
Ports: []discovery.EndpointPort{{
|
||||
Name: utilpointer.StringPtr(""),
|
||||
Port: utilpointer.Int32Ptr(80),
|
||||
Protocol: &tcpProtocol,
|
||||
}},
|
||||
AddressType: discovery.AddressTypeIPv4,
|
||||
Endpoints: []discovery.Endpoint{
|
||||
{
|
||||
// Local, not ready or serving
|
||||
Addresses: []string{"10.0.1.5"},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(false),
|
||||
Serving: utilpointer.BoolPtr(false),
|
||||
Terminating: utilpointer.BoolPtr(true),
|
||||
},
|
||||
NodeName: utilpointer.StringPtr(testHostname),
|
||||
},
|
||||
{
|
||||
// Remote, not ready or serving
|
||||
Addresses: []string{"10.0.1.5"},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(false),
|
||||
Serving: utilpointer.BoolPtr(false),
|
||||
Terminating: utilpointer.BoolPtr(true),
|
||||
},
|
||||
NodeName: utilpointer.StringPtr("host-1"),
|
||||
},
|
||||
},
|
||||
},
|
||||
noUsableEndpoints: true,
|
||||
expectedIPTables: `
|
||||
*filter
|
||||
:KUBE-SERVICES - [0:0]
|
||||
:KUBE-EXTERNAL-SERVICES - [0:0]
|
||||
:KUBE-FORWARD - [0:0]
|
||||
:KUBE-NODEPORTS - [0:0]
|
||||
-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP
|
||||
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
|
||||
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
|
||||
-A KUBE-SERVICES -m comment --comment "ns1/svc1 has no endpoints" -m tcp -p tcp -d 172.30.1.1 --dport 80 -j REJECT
|
||||
-A KUBE-EXTERNAL-SERVICES -m comment --comment "ns1/svc1 has no endpoints" -m tcp -p tcp -d 1.2.3.4 --dport 80 -j REJECT
|
||||
COMMIT
|
||||
*nat
|
||||
:KUBE-SERVICES - [0:0]
|
||||
:KUBE-NODEPORTS - [0:0]
|
||||
:KUBE-POSTROUTING - [0:0]
|
||||
:KUBE-MARK-MASQ - [0:0]
|
||||
-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN
|
||||
-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000
|
||||
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
|
||||
-A KUBE-MARK-MASQ -j MARK --or-mark 0x4000
|
||||
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
|
||||
COMMIT
|
||||
`,
|
||||
},
|
||||
}
|
||||
|
||||
for _, testcase := range testcases {
|
||||
t.Run(testcase.name, func(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ProxyTerminatingEndpoints, testcase.terminatingFeatureGate)()
|
||||
|
||||
ipt := iptablestest.NewFake()
|
||||
fp := NewFakeProxier(ipt)
|
||||
fp.OnServiceSynced()
|
||||
fp.OnEndpointSlicesSynced()
|
||||
|
||||
fp.OnServiceAdd(service)
|
||||
|
||||
fp.OnEndpointSliceAdd(testcase.endpointslice)
|
||||
fp.syncProxyRules()
|
||||
t.Log(fp.iptablesData.String())
|
||||
assertIPTablesRulesEqual(t, testcase.expectedIPTables, fp.iptablesData.String())
|
||||
|
||||
fp.OnEndpointSliceDelete(testcase.endpointslice)
|
||||
fp.syncProxyRules()
|
||||
if testcase.noUsableEndpoints {
|
||||
// Deleting the EndpointSlice should have had no effect
|
||||
assertIPTablesRulesEqual(t, testcase.expectedIPTables, fp.iptablesData.String())
|
||||
} else {
|
||||
assertIPTablesRulesNotEqual(t, testcase.expectedIPTables, fp.iptablesData.String())
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestMasqueradeAll(t *testing.T) {
|
||||
ipt := iptablestest.NewFake()
|
||||
fp := NewFakeProxier(ipt)
|
||||
|
@ -4835,6 +4835,183 @@ func TestTestInternalTrafficPolicyE2E(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// Test_EndpointSliceReadyAndTerminatingCluster tests that when there are ready and ready + terminating
|
||||
// endpoints and the traffic policy is "Cluster", only the ready endpoints are used.
|
||||
func Test_EndpointSliceReadyAndTerminatingCluster(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ProxyTerminatingEndpoints, true)()
|
||||
|
||||
ipt := iptablestest.NewFake()
|
||||
ipvs := ipvstest.NewFake()
|
||||
ipset := ipsettest.NewFake(testIPSetVersion)
|
||||
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
|
||||
fp.servicesSynced = true
|
||||
// fp.endpointsSynced = true
|
||||
fp.endpointSlicesSynced = true
|
||||
|
||||
clusterInternalTrafficPolicy := v1.ServiceInternalTrafficPolicyCluster
|
||||
|
||||
serviceName := "svc1"
|
||||
// Add initial service
|
||||
namespaceName := "ns1"
|
||||
fp.OnServiceAdd(&v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName},
|
||||
Spec: v1.ServiceSpec{
|
||||
ClusterIP: "172.20.1.1",
|
||||
Selector: map[string]string{"foo": "bar"},
|
||||
Type: v1.ServiceTypeNodePort,
|
||||
ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeCluster,
|
||||
InternalTrafficPolicy: &clusterInternalTrafficPolicy,
|
||||
ExternalIPs: []string{
|
||||
"1.2.3.4",
|
||||
},
|
||||
Ports: []v1.ServicePort{
|
||||
{
|
||||
Name: "",
|
||||
Port: 80,
|
||||
TargetPort: intstr.FromInt(80),
|
||||
Protocol: v1.ProtocolTCP,
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
// Add initial endpoint slice
|
||||
tcpProtocol := v1.ProtocolTCP
|
||||
endpointSlice := &discovery.EndpointSlice{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("%s-1", serviceName),
|
||||
Namespace: namespaceName,
|
||||
Labels: map[string]string{discovery.LabelServiceName: serviceName},
|
||||
},
|
||||
Ports: []discovery.EndpointPort{{
|
||||
Name: utilpointer.StringPtr(""),
|
||||
Port: utilpointer.Int32Ptr(80),
|
||||
Protocol: &tcpProtocol,
|
||||
}},
|
||||
AddressType: discovery.AddressTypeIPv4,
|
||||
Endpoints: []discovery.Endpoint{
|
||||
{
|
||||
Addresses: []string{"10.0.1.1"},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(true),
|
||||
Serving: utilpointer.BoolPtr(true),
|
||||
Terminating: utilpointer.BoolPtr(false),
|
||||
},
|
||||
NodeName: utilpointer.StringPtr(testHostname),
|
||||
},
|
||||
{
|
||||
Addresses: []string{"10.0.1.2"},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(true),
|
||||
Serving: utilpointer.BoolPtr(true),
|
||||
Terminating: utilpointer.BoolPtr(false),
|
||||
},
|
||||
NodeName: utilpointer.StringPtr(testHostname),
|
||||
},
|
||||
{
|
||||
Addresses: []string{"10.0.1.3"},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(false),
|
||||
Serving: utilpointer.BoolPtr(true),
|
||||
Terminating: utilpointer.BoolPtr(true),
|
||||
},
|
||||
NodeName: utilpointer.StringPtr(testHostname),
|
||||
},
|
||||
{
|
||||
Addresses: []string{"10.0.1.4"},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(false),
|
||||
Serving: utilpointer.BoolPtr(false),
|
||||
Terminating: utilpointer.BoolPtr(true),
|
||||
},
|
||||
NodeName: utilpointer.StringPtr(testHostname),
|
||||
},
|
||||
{
|
||||
Addresses: []string{"10.0.1.5"},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(true),
|
||||
Serving: utilpointer.BoolPtr(true),
|
||||
Terminating: utilpointer.BoolPtr(false),
|
||||
},
|
||||
NodeName: utilpointer.StringPtr("another-host"),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
fp.OnEndpointSliceAdd(endpointSlice)
|
||||
fp.syncProxyRules()
|
||||
|
||||
// Ensure that Proxier updates ipvs appropriately after EndpointSlice update
|
||||
assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"])
|
||||
activeEntries1 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries
|
||||
assert.Equal(t, 4, activeEntries1.Len(), "Expected 4 active entry in KUBE-LOOP-BACK")
|
||||
assert.Equal(t, true, activeEntries1.Has("10.0.1.1,tcp:80,10.0.1.1"), "Expected activeEntries to reference first pod")
|
||||
assert.Equal(t, true, activeEntries1.Has("10.0.1.2,tcp:80,10.0.1.2"), "Expected activeEntries to reference second pod")
|
||||
assert.Equal(t, true, activeEntries1.Has("10.0.1.3,tcp:80,10.0.1.3"), "Expected activeEntries to reference third pod")
|
||||
assert.Equal(t, true, activeEntries1.Has("10.0.1.4,tcp:80,10.0.1.4"), "Expected activeEntries to reference fourth pod")
|
||||
|
||||
virtualServers, vsErr := ipvs.GetVirtualServers()
|
||||
assert.Nil(t, vsErr, "Expected no error getting virtual servers")
|
||||
assert.Len(t, virtualServers, 2, "Expected 2 virtual server")
|
||||
|
||||
var clusterIPServer, externalIPServer *utilipvs.VirtualServer
|
||||
for _, virtualServer := range virtualServers {
|
||||
if virtualServer.Address.String() == "172.20.1.1" {
|
||||
clusterIPServer = virtualServer
|
||||
}
|
||||
|
||||
if virtualServer.Address.String() == "1.2.3.4" {
|
||||
externalIPServer = virtualServer
|
||||
}
|
||||
}
|
||||
|
||||
// clusterIP should route to cluster-wide ready endpoints
|
||||
realServers1, rsErr1 := ipvs.GetRealServers(clusterIPServer)
|
||||
assert.Nil(t, rsErr1, "Expected no error getting real servers")
|
||||
assert.Len(t, realServers1, 3, "Expected 3 real servers")
|
||||
assert.Equal(t, realServers1[0].String(), "10.0.1.1:80")
|
||||
assert.Equal(t, realServers1[1].String(), "10.0.1.2:80")
|
||||
assert.Equal(t, realServers1[2].String(), "10.0.1.5:80")
|
||||
|
||||
// externalIP should route to cluster-wide ready endpoints
|
||||
realServers2, rsErr2 := ipvs.GetRealServers(externalIPServer)
|
||||
assert.Nil(t, rsErr2, "Expected no error getting real servers")
|
||||
assert.Len(t, realServers2, 3, "Expected 3 real servers")
|
||||
assert.Equal(t, realServers2[0].String(), "10.0.1.1:80")
|
||||
assert.Equal(t, realServers2[1].String(), "10.0.1.2:80")
|
||||
assert.Equal(t, realServers1[2].String(), "10.0.1.5:80")
|
||||
|
||||
fp.OnEndpointSliceDelete(endpointSlice)
|
||||
fp.syncProxyRules()
|
||||
|
||||
// Ensure that Proxier updates ipvs appropriately after EndpointSlice delete
|
||||
assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"])
|
||||
activeEntries2 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries
|
||||
assert.Equal(t, 0, activeEntries2.Len(), "Expected 0 active entries in KUBE-LOOP-BACK")
|
||||
|
||||
virtualServers, vsErr = ipvs.GetVirtualServers()
|
||||
assert.Nil(t, vsErr, "Expected no error getting virtual servers")
|
||||
assert.Len(t, virtualServers, 2, "Expected 1 virtual server")
|
||||
|
||||
for _, virtualServer := range virtualServers {
|
||||
if virtualServer.Address.String() == "172.20.1.1" {
|
||||
clusterIPServer = virtualServer
|
||||
}
|
||||
|
||||
if virtualServer.Address.String() == "1.2.3.4" {
|
||||
externalIPServer = virtualServer
|
||||
}
|
||||
}
|
||||
|
||||
realServers1, rsErr1 = ipvs.GetRealServers(clusterIPServer)
|
||||
assert.Nil(t, rsErr1, "Expected no error getting real servers")
|
||||
assert.Len(t, realServers1, 0, "Expected 0 real servers")
|
||||
|
||||
realServers2, rsErr2 = ipvs.GetRealServers(externalIPServer)
|
||||
assert.Nil(t, rsErr2, "Expected no error getting real servers")
|
||||
assert.Len(t, realServers2, 0, "Expected 0 real servers")
|
||||
}
|
||||
|
||||
// Test_EndpointSliceReadyAndTerminatingLocal tests that when there are local ready and ready + terminating
|
||||
// endpoints, only the ready endpoints are used.
|
||||
func Test_EndpointSliceReadyAndTerminatingLocal(t *testing.T) {
|
||||
@ -5011,6 +5188,182 @@ func Test_EndpointSliceReadyAndTerminatingLocal(t *testing.T) {
|
||||
assert.Len(t, realServers2, 0, "Expected 0 real servers")
|
||||
}
|
||||
|
||||
// Test_EndpointSliceOnlyReadyTerminatingCluster tests that when there are only ready terminating
|
||||
// endpoints and the traffic policy is "Cluster", we fall back to terminating endpoints.
|
||||
func Test_EndpointSliceOnlyReadyAndTerminatingCluster(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ProxyTerminatingEndpoints, true)()
|
||||
|
||||
ipt := iptablestest.NewFake()
|
||||
ipvs := ipvstest.NewFake()
|
||||
ipset := ipsettest.NewFake(testIPSetVersion)
|
||||
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
|
||||
fp.servicesSynced = true
|
||||
// fp.endpointsSynced = true
|
||||
fp.endpointSlicesSynced = true
|
||||
|
||||
clusterInternalTrafficPolicy := v1.ServiceInternalTrafficPolicyCluster
|
||||
|
||||
// Add initial service
|
||||
serviceName := "svc1"
|
||||
namespaceName := "ns1"
|
||||
fp.OnServiceAdd(&v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName},
|
||||
Spec: v1.ServiceSpec{
|
||||
ClusterIP: "172.20.1.1",
|
||||
Selector: map[string]string{"foo": "bar"},
|
||||
Type: v1.ServiceTypeNodePort,
|
||||
ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeCluster,
|
||||
InternalTrafficPolicy: &clusterInternalTrafficPolicy,
|
||||
ExternalIPs: []string{
|
||||
"1.2.3.4",
|
||||
},
|
||||
Ports: []v1.ServicePort{
|
||||
{
|
||||
Name: "",
|
||||
Port: 80,
|
||||
TargetPort: intstr.FromInt(80),
|
||||
Protocol: v1.ProtocolTCP,
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
// Add initial endpoint slice
|
||||
tcpProtocol := v1.ProtocolTCP
|
||||
endpointSlice := &discovery.EndpointSlice{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("%s-1", serviceName),
|
||||
Namespace: namespaceName,
|
||||
Labels: map[string]string{discovery.LabelServiceName: serviceName},
|
||||
},
|
||||
Ports: []discovery.EndpointPort{{
|
||||
Name: utilpointer.StringPtr(""),
|
||||
Port: utilpointer.Int32Ptr(80),
|
||||
Protocol: &tcpProtocol,
|
||||
}},
|
||||
AddressType: discovery.AddressTypeIPv4,
|
||||
Endpoints: []discovery.Endpoint{
|
||||
{
|
||||
Addresses: []string{"10.0.1.1"},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(false),
|
||||
Serving: utilpointer.BoolPtr(true),
|
||||
Terminating: utilpointer.BoolPtr(true),
|
||||
},
|
||||
NodeName: utilpointer.StringPtr(testHostname),
|
||||
},
|
||||
{
|
||||
Addresses: []string{"10.0.1.2"},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(false),
|
||||
Serving: utilpointer.BoolPtr(true),
|
||||
Terminating: utilpointer.BoolPtr(true),
|
||||
},
|
||||
NodeName: utilpointer.StringPtr(testHostname),
|
||||
},
|
||||
{
|
||||
Addresses: []string{"10.0.1.3"},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(false),
|
||||
Serving: utilpointer.BoolPtr(false),
|
||||
Terminating: utilpointer.BoolPtr(true),
|
||||
},
|
||||
NodeName: utilpointer.StringPtr(testHostname),
|
||||
},
|
||||
{
|
||||
Addresses: []string{"10.0.1.4"},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(false),
|
||||
Serving: utilpointer.BoolPtr(true),
|
||||
Terminating: utilpointer.BoolPtr(true),
|
||||
},
|
||||
NodeName: utilpointer.StringPtr("another-host"),
|
||||
},
|
||||
{
|
||||
Addresses: []string{"10.0.1.5"},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(false),
|
||||
Serving: utilpointer.BoolPtr(false),
|
||||
Terminating: utilpointer.BoolPtr(false),
|
||||
},
|
||||
NodeName: utilpointer.StringPtr("another-host"),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
fp.OnEndpointSliceAdd(endpointSlice)
|
||||
fp.syncProxyRules()
|
||||
|
||||
// Ensure that Proxier updates ipvs appropriately after EndpointSlice update
|
||||
assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"])
|
||||
activeEntries1 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries
|
||||
assert.Equal(t, 3, activeEntries1.Len(), "Expected 3 active entry in KUBE-LOOP-BACK")
|
||||
assert.Equal(t, true, activeEntries1.Has("10.0.1.1,tcp:80,10.0.1.1"), "Expected activeEntries to reference first (local) pod")
|
||||
assert.Equal(t, true, activeEntries1.Has("10.0.1.2,tcp:80,10.0.1.2"), "Expected activeEntries to reference second (local) pod")
|
||||
assert.Equal(t, true, activeEntries1.Has("10.0.1.3,tcp:80,10.0.1.3"), "Expected activeEntries to reference second (local) pod")
|
||||
|
||||
virtualServers, vsErr := ipvs.GetVirtualServers()
|
||||
assert.Nil(t, vsErr, "Expected no error getting virtual servers")
|
||||
assert.Len(t, virtualServers, 2, "Expected 2 virtual server")
|
||||
|
||||
var clusterIPServer, externalIPServer *utilipvs.VirtualServer
|
||||
for _, virtualServer := range virtualServers {
|
||||
if virtualServer.Address.String() == "172.20.1.1" {
|
||||
clusterIPServer = virtualServer
|
||||
}
|
||||
|
||||
if virtualServer.Address.String() == "1.2.3.4" {
|
||||
externalIPServer = virtualServer
|
||||
}
|
||||
}
|
||||
|
||||
// clusterIP should fall back to cluster-wide ready + terminating endpoints
|
||||
realServers1, rsErr1 := ipvs.GetRealServers(clusterIPServer)
|
||||
assert.Nil(t, rsErr1, "Expected no error getting real servers")
|
||||
assert.Len(t, realServers1, 3, "Expected 1 real servers")
|
||||
assert.Equal(t, realServers1[0].String(), "10.0.1.1:80")
|
||||
assert.Equal(t, realServers1[1].String(), "10.0.1.2:80")
|
||||
assert.Equal(t, realServers1[2].String(), "10.0.1.4:80")
|
||||
|
||||
// externalIP should fall back to ready + terminating endpoints
|
||||
realServers2, rsErr2 := ipvs.GetRealServers(externalIPServer)
|
||||
assert.Nil(t, rsErr2, "Expected no error getting real servers")
|
||||
assert.Len(t, realServers2, 3, "Expected 2 real servers")
|
||||
assert.Equal(t, realServers2[0].String(), "10.0.1.1:80")
|
||||
assert.Equal(t, realServers2[1].String(), "10.0.1.2:80")
|
||||
assert.Equal(t, realServers2[2].String(), "10.0.1.4:80")
|
||||
|
||||
fp.OnEndpointSliceDelete(endpointSlice)
|
||||
fp.syncProxyRules()
|
||||
|
||||
// Ensure that Proxier updates ipvs appropriately after EndpointSlice delete
|
||||
assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"])
|
||||
activeEntries2 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries
|
||||
assert.Equal(t, 0, activeEntries2.Len(), "Expected 0 active entries in KUBE-LOOP-BACK")
|
||||
|
||||
virtualServers, vsErr = ipvs.GetVirtualServers()
|
||||
assert.Nil(t, vsErr, "Expected no error getting virtual servers")
|
||||
assert.Len(t, virtualServers, 2, "Expected 1 virtual server")
|
||||
|
||||
for _, virtualServer := range virtualServers {
|
||||
if virtualServer.Address.String() == "172.20.1.1" {
|
||||
clusterIPServer = virtualServer
|
||||
}
|
||||
|
||||
if virtualServer.Address.String() == "1.2.3.4" {
|
||||
externalIPServer = virtualServer
|
||||
}
|
||||
}
|
||||
|
||||
realServers1, rsErr1 = ipvs.GetRealServers(clusterIPServer)
|
||||
assert.Nil(t, rsErr1, "Expected no error getting real servers")
|
||||
assert.Len(t, realServers1, 0, "Expected 0 real servers")
|
||||
|
||||
realServers2, rsErr2 = ipvs.GetRealServers(externalIPServer)
|
||||
assert.Nil(t, rsErr2, "Expected no error getting real servers")
|
||||
assert.Len(t, realServers2, 0, "Expected 0 real servers")
|
||||
}
|
||||
|
||||
// Test_EndpointSliceOnlyReadyTerminatingLocal tests that when there are only local ready terminating
|
||||
// endpoints, we fall back to those endpoints.
|
||||
func Test_EndpointSliceOnlyReadyAndTerminatingLocal(t *testing.T) {
|
||||
|
@ -52,6 +52,19 @@ func CategorizeEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeLabels m
|
||||
return true
|
||||
})
|
||||
|
||||
// if there are 0 cluster-wide endpoints, we can try to fallback to any terminating endpoints that are ready.
|
||||
// When falling back to terminating endpoints, we do NOT consider topology aware routing since this is a best
|
||||
// effort attempt to avoid dropping connections.
|
||||
if len(clusterEndpoints) == 0 && utilfeature.DefaultFeatureGate.Enabled(features.ProxyTerminatingEndpoints) {
|
||||
clusterEndpoints = filterEndpoints(endpoints, func(ep Endpoint) bool {
|
||||
if ep.IsServing() && ep.IsTerminating() {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
})
|
||||
}
|
||||
|
||||
// If there are any Ready endpoints anywhere in the cluster, we are
|
||||
// guaranteed to get one in clusterEndpoints.
|
||||
if len(clusterEndpoints) > 0 {
|
||||
|
@ -350,7 +350,17 @@ func TestCategorizeEndpoints(t *testing.T) {
|
||||
serviceInfo: &BaseServiceInfo{},
|
||||
endpoints: []Endpoint{
|
||||
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: false, Serving: true, Terminating: true, IsLocal: true},
|
||||
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: true, Terminating: true, IsLocal: true},
|
||||
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: true, Terminating: true, IsLocal: false},
|
||||
},
|
||||
clusterEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"),
|
||||
localEndpoints: nil,
|
||||
}, {
|
||||
name: "Cluster traffic policy, PTE disabled, all endpoints are terminating",
|
||||
pteEnabled: false,
|
||||
serviceInfo: &BaseServiceInfo{},
|
||||
endpoints: []Endpoint{
|
||||
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: false, Serving: true, Terminating: true, IsLocal: true},
|
||||
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: true, Terminating: true, IsLocal: false},
|
||||
},
|
||||
clusterEndpoints: sets.NewString(),
|
||||
localEndpoints: nil,
|
||||
@ -412,9 +422,9 @@ func TestCategorizeEndpoints(t *testing.T) {
|
||||
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: false, Serving: true, Terminating: true, IsLocal: false},
|
||||
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: true, Terminating: true, IsLocal: false},
|
||||
},
|
||||
clusterEndpoints: sets.NewString(),
|
||||
clusterEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"),
|
||||
localEndpoints: sets.NewString(),
|
||||
allEndpoints: sets.NewString(),
|
||||
allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"),
|
||||
onlyRemoteEndpoints: true,
|
||||
}, {
|
||||
name: "iTP: Cluster, eTP: Local, PTE disabled, with terminating endpoints",
|
||||
|
@ -52,6 +52,7 @@ import (
|
||||
|
||||
cloudprovider "k8s.io/cloud-provider"
|
||||
netutils "k8s.io/utils/net"
|
||||
utilpointer "k8s.io/utils/pointer"
|
||||
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
e2edeployment "k8s.io/kubernetes/test/e2e/framework/deployment"
|
||||
@ -2309,6 +2310,456 @@ var _ = common.SIGDescribe("Services", func() {
|
||||
}
|
||||
})
|
||||
|
||||
ginkgo.It("should fail health check node port if there are only terminating endpoints [Feature:ProxyTerminatingEndpoints]", func() {
|
||||
// windows kube-proxy does not support this feature yet
|
||||
e2eskipper.SkipIfNodeOSDistroIs("windows")
|
||||
|
||||
// This behavior is not supported if Kube-proxy is in "userspace" mode.
|
||||
// So we check the kube-proxy mode and skip this test if that's the case.
|
||||
if proxyMode, err := proxyMode(f); err == nil {
|
||||
if proxyMode == "userspace" {
|
||||
e2eskipper.Skipf("The test doesn't work with kube-proxy in userspace mode")
|
||||
}
|
||||
} else {
|
||||
framework.Logf("Couldn't detect KubeProxy mode - test failure may be expected: %v", err)
|
||||
}
|
||||
|
||||
nodes, err := e2enode.GetBoundedReadySchedulableNodes(cs, 2)
|
||||
framework.ExpectNoError(err)
|
||||
nodeCounts := len(nodes.Items)
|
||||
if nodeCounts < 2 {
|
||||
e2eskipper.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts)
|
||||
}
|
||||
node0 := nodes.Items[0]
|
||||
|
||||
serviceName := "svc-proxy-terminating"
|
||||
ns := f.Namespace.Name
|
||||
servicePort := 80
|
||||
|
||||
ginkgo.By("creating a TCP service " + serviceName + " where all pods are terminating" + ns)
|
||||
jig := e2eservice.NewTestJig(cs, ns, serviceName)
|
||||
svc, err := jig.CreateTCPService(func(svc *v1.Service) {
|
||||
svc.Spec.Ports = []v1.ServicePort{
|
||||
{Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt(80)},
|
||||
}
|
||||
svc.Spec.Type = v1.ServiceTypeLoadBalancer
|
||||
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
|
||||
})
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
ginkgo.By("Creating 1 webserver pod to be part of the TCP service")
|
||||
webserverPod0 := e2epod.NewAgnhostPod(ns, "echo-hostname-0", nil, nil, nil, "netexec", "--http-port", strconv.Itoa(servicePort), "--delay-shutdown", "600")
|
||||
webserverPod0.Labels = jig.Labels
|
||||
webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(600)
|
||||
e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name})
|
||||
|
||||
_, err = cs.CoreV1().Pods(ns).Create(context.TODO(), webserverPod0, metav1.CreateOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, webserverPod0.Name, f.Namespace.Name, framework.PodStartTimeout))
|
||||
validateEndpointsPortsOrFail(cs, ns, serviceName, portsByPodName{webserverPod0.Name: {servicePort}})
|
||||
|
||||
pausePod0 := e2epod.NewAgnhostPod(ns, "pause-pod-0", nil, nil, nil)
|
||||
e2epod.SetNodeSelection(&pausePod0.Spec, e2epod.NodeSelection{Name: node0.Name})
|
||||
|
||||
pausePod0, err = cs.CoreV1().Pods(ns).Create(context.TODO(), pausePod0, metav1.CreateOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, pausePod0.Name, f.Namespace.Name, framework.PodStartTimeout))
|
||||
|
||||
nodeIPs := e2enode.GetAddresses(&node0, v1.NodeInternalIP)
|
||||
healthCheckNodePortAddr := net.JoinHostPort(nodeIPs[0], strconv.Itoa(int(svc.Spec.HealthCheckNodePort)))
|
||||
// validate that the health check node port from kube-proxy returns 200 when there are ready endpoints
|
||||
err = wait.PollImmediate(time.Second, time.Minute, func() (bool, error) {
|
||||
cmd := fmt.Sprintf(`curl -s -o /dev/null -w "%%{http_code}" --connect-timeout 5 http://%s/healthz`, healthCheckNodePortAddr)
|
||||
out, err := framework.RunHostCmd(pausePod0.Namespace, pausePod0.Name, cmd)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
expectedOut := "200"
|
||||
if out != expectedOut {
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
// webserver should continue to serve traffic through the Service after deletion, even though the health check node port should return 503
|
||||
ginkgo.By("Terminating the webserver pod")
|
||||
err = cs.CoreV1().Pods(ns).Delete(context.TODO(), webserverPod0.Name, metav1.DeleteOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
// validate that the health check node port from kube-proxy returns 503 when there are no ready endpoints
|
||||
err = wait.PollImmediate(time.Second, time.Minute, func() (bool, error) {
|
||||
cmd := fmt.Sprintf(`curl -s -o /dev/null -w "%%{http_code}" --connect-timeout 5 http://%s/healthz`, healthCheckNodePortAddr)
|
||||
out, err := framework.RunHostCmd(pausePod0.Namespace, pausePod0.Name, cmd)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
expectedOut := "503"
|
||||
if out != expectedOut {
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
// also verify that while health check node port indicates 0 endpoints and returns 503, the endpoint still serves traffic.
|
||||
nodePortAddress := net.JoinHostPort(nodeIPs[0], strconv.Itoa(int(svc.Spec.Ports[0].NodePort)))
|
||||
execHostnameTest(*pausePod0, nodePortAddress, webserverPod0.Name)
|
||||
})
|
||||
|
||||
ginkgo.It("should fallback to terminating endpoints when there are no ready endpoints with internalTrafficPolicy=Cluster [Feature:ProxyTerminatingEndpoints]", func() {
|
||||
// windows kube-proxy does not support this feature yet
|
||||
e2eskipper.SkipIfNodeOSDistroIs("windows")
|
||||
|
||||
// This behavior is not supported if Kube-proxy is in "userspace" mode.
|
||||
// So we check the kube-proxy mode and skip this test if that's the case.
|
||||
if proxyMode, err := proxyMode(f); err == nil {
|
||||
if proxyMode == "userspace" {
|
||||
e2eskipper.Skipf("The test doesn't work with kube-proxy in userspace mode")
|
||||
}
|
||||
} else {
|
||||
framework.Logf("Couldn't detect KubeProxy mode - test failure may be expected: %v", err)
|
||||
}
|
||||
|
||||
nodes, err := e2enode.GetBoundedReadySchedulableNodes(cs, 2)
|
||||
framework.ExpectNoError(err)
|
||||
nodeCounts := len(nodes.Items)
|
||||
if nodeCounts < 2 {
|
||||
e2eskipper.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts)
|
||||
}
|
||||
node0 := nodes.Items[0]
|
||||
node1 := nodes.Items[1]
|
||||
|
||||
serviceName := "svc-proxy-terminating"
|
||||
ns := f.Namespace.Name
|
||||
servicePort := 80
|
||||
|
||||
ginkgo.By("creating a TCP service " + serviceName + " where all pods are terminating" + ns)
|
||||
jig := e2eservice.NewTestJig(cs, ns, serviceName)
|
||||
svc, err := jig.CreateTCPService(func(svc *v1.Service) {
|
||||
svc.Spec.Ports = []v1.ServicePort{
|
||||
{Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt(80)},
|
||||
}
|
||||
})
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
ginkgo.By("Creating 1 webserver pod to be part of the TCP service")
|
||||
webserverPod0 := e2epod.NewAgnhostPod(ns, "echo-hostname-0", nil, nil, nil, "netexec", "--http-port", strconv.Itoa(servicePort), "--delay-shutdown", "600")
|
||||
webserverPod0.Labels = jig.Labels
|
||||
webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(600)
|
||||
e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name})
|
||||
|
||||
_, err = cs.CoreV1().Pods(ns).Create(context.TODO(), webserverPod0, metav1.CreateOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, webserverPod0.Name, f.Namespace.Name, framework.PodStartTimeout))
|
||||
validateEndpointsPortsOrFail(cs, ns, serviceName, portsByPodName{webserverPod0.Name: {servicePort}})
|
||||
|
||||
ginkgo.By("Creating 2 pause pods that will try to connect to the webservers")
|
||||
pausePod0 := e2epod.NewAgnhostPod(ns, "pause-pod-0", nil, nil, nil)
|
||||
e2epod.SetNodeSelection(&pausePod0.Spec, e2epod.NodeSelection{Name: node0.Name})
|
||||
|
||||
pausePod0, err = cs.CoreV1().Pods(ns).Create(context.TODO(), pausePod0, metav1.CreateOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, pausePod0.Name, f.Namespace.Name, framework.PodStartTimeout))
|
||||
|
||||
pausePod1 := e2epod.NewAgnhostPod(ns, "pause-pod-1", nil, nil, nil)
|
||||
e2epod.SetNodeSelection(&pausePod1.Spec, e2epod.NodeSelection{Name: node1.Name})
|
||||
|
||||
pausePod1, err = cs.CoreV1().Pods(ns).Create(context.TODO(), pausePod1, metav1.CreateOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, pausePod1.Name, f.Namespace.Name, framework.PodStartTimeout))
|
||||
|
||||
// webserver should continue to serve traffic through the Service after delete since:
|
||||
// - it has a 600s termination grace period
|
||||
// - it is the only ready endpoint
|
||||
err = cs.CoreV1().Pods(ns).Delete(context.TODO(), webserverPod0.Name, metav1.DeleteOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
// assert 5 times that both the local and remote pod can connect to the Service while all endpoints are terminating
|
||||
serviceAddress := net.JoinHostPort(svc.Spec.ClusterIP, strconv.Itoa(servicePort))
|
||||
for i := 0; i < 5; i++ {
|
||||
// There's a Service with internalTrafficPolicy=Cluster,
|
||||
// with a single endpoint (which is terminating) called webserver0 running on node0.
|
||||
// pausePod0 and pausePod1 are on node0 and node1 respectively.
|
||||
// pausePod0 -> Service clusterIP succeeds because traffic policy is "Cluster"
|
||||
// pausePod1 -> Service clusterIP succeeds because traffic policy is "Cluster"
|
||||
execHostnameTest(*pausePod0, serviceAddress, webserverPod0.Name)
|
||||
execHostnameTest(*pausePod1, serviceAddress, webserverPod0.Name)
|
||||
|
||||
time.Sleep(5 * time.Second)
|
||||
}
|
||||
})
|
||||
|
||||
ginkgo.It("should fallback to local terminating endpoints when there are no ready endpoints with internalTrafficPolicy=Local [Feature:ProxyTerminatingEndpoints]", func() {
|
||||
// windows kube-proxy does not support this feature yet
|
||||
e2eskipper.SkipIfNodeOSDistroIs("windows")
|
||||
|
||||
// This behavior is not supported if Kube-proxy is in "userspace" mode.
|
||||
// So we check the kube-proxy mode and skip this test if that's the case.
|
||||
if proxyMode, err := proxyMode(f); err == nil {
|
||||
if proxyMode == "userspace" {
|
||||
e2eskipper.Skipf("The test doesn't work with kube-proxy in userspace mode")
|
||||
}
|
||||
} else {
|
||||
framework.Logf("Couldn't detect KubeProxy mode - test failure may be expected: %v", err)
|
||||
}
|
||||
|
||||
nodes, err := e2enode.GetBoundedReadySchedulableNodes(cs, 2)
|
||||
framework.ExpectNoError(err)
|
||||
nodeCounts := len(nodes.Items)
|
||||
if nodeCounts < 2 {
|
||||
e2eskipper.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts)
|
||||
}
|
||||
node0 := nodes.Items[0]
|
||||
node1 := nodes.Items[1]
|
||||
|
||||
serviceName := "svc-proxy-terminating"
|
||||
ns := f.Namespace.Name
|
||||
servicePort := 80
|
||||
|
||||
ginkgo.By("creating a TCP service " + serviceName + " where all pods are terminating" + ns)
|
||||
jig := e2eservice.NewTestJig(cs, ns, serviceName)
|
||||
local := v1.ServiceInternalTrafficPolicyLocal
|
||||
svc, err := jig.CreateTCPService(func(svc *v1.Service) {
|
||||
svc.Spec.Ports = []v1.ServicePort{
|
||||
{Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt(80)},
|
||||
}
|
||||
svc.Spec.InternalTrafficPolicy = &local
|
||||
})
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
ginkgo.By("Creating 1 webserver pod to be part of the TCP service")
|
||||
webserverPod0 := e2epod.NewAgnhostPod(ns, "echo-hostname-0", nil, nil, nil, "netexec", "--http-port", strconv.Itoa(servicePort), "--delay-shutdown", "600")
|
||||
webserverPod0.Labels = jig.Labels
|
||||
webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(600)
|
||||
e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name})
|
||||
|
||||
_, err = cs.CoreV1().Pods(ns).Create(context.TODO(), webserverPod0, metav1.CreateOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, webserverPod0.Name, f.Namespace.Name, framework.PodStartTimeout))
|
||||
validateEndpointsPortsOrFail(cs, ns, serviceName, portsByPodName{webserverPod0.Name: {servicePort}})
|
||||
|
||||
ginkgo.By("Creating 2 pause pods that will try to connect to the webservers")
|
||||
pausePod0 := e2epod.NewAgnhostPod(ns, "pause-pod-0", nil, nil, nil)
|
||||
e2epod.SetNodeSelection(&pausePod0.Spec, e2epod.NodeSelection{Name: node0.Name})
|
||||
|
||||
pausePod0, err = cs.CoreV1().Pods(ns).Create(context.TODO(), pausePod0, metav1.CreateOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, pausePod0.Name, f.Namespace.Name, framework.PodStartTimeout))
|
||||
|
||||
pausePod1 := e2epod.NewAgnhostPod(ns, "pause-pod-1", nil, nil, nil)
|
||||
e2epod.SetNodeSelection(&pausePod1.Spec, e2epod.NodeSelection{Name: node1.Name})
|
||||
|
||||
pausePod1, err = cs.CoreV1().Pods(ns).Create(context.TODO(), pausePod1, metav1.CreateOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, pausePod1.Name, f.Namespace.Name, framework.PodStartTimeout))
|
||||
|
||||
// webserver should continue to serve traffic through the Service after delete since:
|
||||
// - it has a 600s termination grace period
|
||||
// - it is the only ready endpoint
|
||||
err = cs.CoreV1().Pods(ns).Delete(context.TODO(), webserverPod0.Name, metav1.DeleteOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
// assert 5 times that the first pause pod can connect to the Service locally and the second one errors with a timeout
|
||||
serviceAddress := net.JoinHostPort(svc.Spec.ClusterIP, strconv.Itoa(servicePort))
|
||||
for i := 0; i < 5; i++ {
|
||||
// There's a Service with internalTrafficPolicy=Local,
|
||||
// with a single endpoint (which is terminating) called webserver0 running on node0.
|
||||
// pausePod0 and pausePod1 are on node0 and node1 respectively.
|
||||
// pausePod0 -> Service clusterIP succeeds because webserver0 is running on node0 and traffic policy is "Local"
|
||||
// pausePod1 -> Service clusterIP fails because webserver0 is on a different node and traffic policy is "Local"
|
||||
execHostnameTest(*pausePod0, serviceAddress, webserverPod0.Name)
|
||||
|
||||
cmd := fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, serviceAddress)
|
||||
_, err := framework.RunHostCmd(pausePod1.Namespace, pausePod1.Name, cmd)
|
||||
framework.ExpectError(err, "expected error when trying to connect to cluster IP")
|
||||
|
||||
time.Sleep(5 * time.Second)
|
||||
}
|
||||
})
|
||||
|
||||
ginkgo.It("should fallback to terminating endpoints when there are no ready endpoints with externallTrafficPolicy=Cluster [Feature:ProxyTerminatingEndpoints]", func() {
|
||||
// windows kube-proxy does not support this feature yet
|
||||
e2eskipper.SkipIfNodeOSDistroIs("windows")
|
||||
|
||||
// This behavior is not supported if Kube-proxy is in "userspace" mode.
|
||||
// So we check the kube-proxy mode and skip this test if that's the case.
|
||||
if proxyMode, err := proxyMode(f); err == nil {
|
||||
if proxyMode == "userspace" {
|
||||
e2eskipper.Skipf("The test doesn't work with kube-proxy in userspace mode")
|
||||
}
|
||||
} else {
|
||||
framework.Logf("Couldn't detect KubeProxy mode - test failure may be expected: %v", err)
|
||||
}
|
||||
|
||||
nodes, err := e2enode.GetBoundedReadySchedulableNodes(cs, 2)
|
||||
framework.ExpectNoError(err)
|
||||
nodeCounts := len(nodes.Items)
|
||||
if nodeCounts < 2 {
|
||||
e2eskipper.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts)
|
||||
}
|
||||
node0 := nodes.Items[0]
|
||||
node1 := nodes.Items[1]
|
||||
|
||||
serviceName := "svc-proxy-terminating"
|
||||
ns := f.Namespace.Name
|
||||
servicePort := 80
|
||||
|
||||
ginkgo.By("creating a TCP service " + serviceName + " where all pods are terminating" + ns)
|
||||
jig := e2eservice.NewTestJig(cs, ns, serviceName)
|
||||
svc, err := jig.CreateTCPService(func(svc *v1.Service) {
|
||||
svc.Spec.Ports = []v1.ServicePort{
|
||||
{Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt(80)},
|
||||
}
|
||||
svc.Spec.Type = v1.ServiceTypeNodePort
|
||||
})
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
ginkgo.By("Creating 1 webserver pod to be part of the TCP service")
|
||||
webserverPod0 := e2epod.NewAgnhostPod(ns, "echo-hostname-0", nil, nil, nil, "netexec", "--http-port", strconv.Itoa(servicePort), "--delay-shutdown", "600")
|
||||
webserverPod0.Labels = jig.Labels
|
||||
webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(600)
|
||||
e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name})
|
||||
|
||||
_, err = cs.CoreV1().Pods(ns).Create(context.TODO(), webserverPod0, metav1.CreateOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, webserverPod0.Name, f.Namespace.Name, framework.PodStartTimeout))
|
||||
validateEndpointsPortsOrFail(cs, ns, serviceName, portsByPodName{webserverPod0.Name: {servicePort}})
|
||||
|
||||
ginkgo.By("Creating 2 pause pods that will try to connect to the webservers")
|
||||
pausePod0 := e2epod.NewAgnhostPod(ns, "pause-pod-0", nil, nil, nil)
|
||||
e2epod.SetNodeSelection(&pausePod0.Spec, e2epod.NodeSelection{Name: node0.Name})
|
||||
|
||||
pausePod0, err = cs.CoreV1().Pods(ns).Create(context.TODO(), pausePod0, metav1.CreateOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, pausePod0.Name, f.Namespace.Name, framework.PodStartTimeout))
|
||||
|
||||
pausePod1 := e2epod.NewAgnhostPod(ns, "pause-pod-1", nil, nil, nil)
|
||||
e2epod.SetNodeSelection(&pausePod1.Spec, e2epod.NodeSelection{Name: node1.Name})
|
||||
|
||||
pausePod1, err = cs.CoreV1().Pods(ns).Create(context.TODO(), pausePod1, metav1.CreateOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, pausePod1.Name, f.Namespace.Name, framework.PodStartTimeout))
|
||||
|
||||
// webserver should continue to serve traffic through the Service after delete since:
|
||||
// - it has a 600s termination grace period
|
||||
// - it is the only ready endpoint
|
||||
err = cs.CoreV1().Pods(ns).Delete(context.TODO(), webserverPod0.Name, metav1.DeleteOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
// assert 5 times that both the local and remote pod can connect to the Service NodePort while all endpoints are terminating
|
||||
nodeIPs := e2enode.GetAddresses(&node0, v1.NodeInternalIP)
|
||||
nodePortAddress := net.JoinHostPort(nodeIPs[0], strconv.Itoa(int(svc.Spec.Ports[0].NodePort)))
|
||||
for i := 0; i < 5; i++ {
|
||||
// There's a Service Type=NodePort with externalTrafficPolicy=Cluster,
|
||||
// with a single endpoint (which is terminating) called webserver0 running on node0.
|
||||
// pausePod0 and pausePod1 are on node0 and node1 respectively.
|
||||
// pausePod0 -> node0 node port succeeds because webserver0 is running on node0 and traffic policy is "Cluster"
|
||||
// pausePod1 -> node0 node port succeeds because webserver0 is running on node0 and traffic policy is "Cluster"
|
||||
execHostnameTest(*pausePod0, nodePortAddress, webserverPod0.Name)
|
||||
execHostnameTest(*pausePod1, nodePortAddress, webserverPod0.Name)
|
||||
|
||||
time.Sleep(5 * time.Second)
|
||||
}
|
||||
})
|
||||
|
||||
ginkgo.It("should fallback to local terminating endpoints when there are no ready endpoints with externalTrafficPolicy=Local [Feature:ProxyTerminatingEndpoints]", func() {
|
||||
// windows kube-proxy does not support this feature yet
|
||||
e2eskipper.SkipIfNodeOSDistroIs("windows")
|
||||
|
||||
// This behavior is not supported if Kube-proxy is in "userspace" mode.
|
||||
// So we check the kube-proxy mode and skip this test if that's the case.
|
||||
if proxyMode, err := proxyMode(f); err == nil {
|
||||
if proxyMode == "userspace" {
|
||||
e2eskipper.Skipf("The test doesn't work with kube-proxy in userspace mode")
|
||||
}
|
||||
} else {
|
||||
framework.Logf("Couldn't detect KubeProxy mode - test failure may be expected: %v", err)
|
||||
}
|
||||
|
||||
nodes, err := e2enode.GetBoundedReadySchedulableNodes(cs, 2)
|
||||
framework.ExpectNoError(err)
|
||||
nodeCounts := len(nodes.Items)
|
||||
if nodeCounts < 2 {
|
||||
e2eskipper.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts)
|
||||
}
|
||||
node0 := nodes.Items[0]
|
||||
node1 := nodes.Items[1]
|
||||
|
||||
serviceName := "svc-proxy-terminating"
|
||||
ns := f.Namespace.Name
|
||||
servicePort := 80
|
||||
|
||||
ginkgo.By("creating a TCP service " + serviceName + " where all pods are terminating" + ns)
|
||||
jig := e2eservice.NewTestJig(cs, ns, serviceName)
|
||||
svc, err := jig.CreateTCPService(func(svc *v1.Service) {
|
||||
svc.Spec.Ports = []v1.ServicePort{
|
||||
{Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt(80)},
|
||||
}
|
||||
svc.Spec.Type = v1.ServiceTypeNodePort
|
||||
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
|
||||
})
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
ginkgo.By("Creating 1 webserver pod to be part of the TCP service")
|
||||
webserverPod0 := e2epod.NewAgnhostPod(ns, "echo-hostname-0", nil, nil, nil, "netexec", "--http-port", strconv.Itoa(servicePort), "--delay-shutdown", "600")
|
||||
webserverPod0.Labels = jig.Labels
|
||||
webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(600)
|
||||
e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name})
|
||||
|
||||
_, err = cs.CoreV1().Pods(ns).Create(context.TODO(), webserverPod0, metav1.CreateOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, webserverPod0.Name, f.Namespace.Name, framework.PodStartTimeout))
|
||||
validateEndpointsPortsOrFail(cs, ns, serviceName, portsByPodName{webserverPod0.Name: {servicePort}})
|
||||
|
||||
ginkgo.By("Creating 2 pause pods that will try to connect to the webservers")
|
||||
pausePod0 := e2epod.NewAgnhostPod(ns, "pause-pod-0", nil, nil, nil)
|
||||
e2epod.SetNodeSelection(&pausePod0.Spec, e2epod.NodeSelection{Name: node0.Name})
|
||||
|
||||
pausePod0, err = cs.CoreV1().Pods(ns).Create(context.TODO(), pausePod0, metav1.CreateOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, pausePod0.Name, f.Namespace.Name, framework.PodStartTimeout))
|
||||
|
||||
pausePod1 := e2epod.NewAgnhostPod(ns, "pause-pod-1", nil, nil, nil)
|
||||
e2epod.SetNodeSelection(&pausePod1.Spec, e2epod.NodeSelection{Name: node1.Name})
|
||||
|
||||
pausePod1, err = cs.CoreV1().Pods(ns).Create(context.TODO(), pausePod1, metav1.CreateOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, pausePod1.Name, f.Namespace.Name, framework.PodStartTimeout))
|
||||
|
||||
// webserver should continue to serve traffic through the Service after delete since:
|
||||
// - it has a 600s termination grace period
|
||||
// - it is the only ready endpoint
|
||||
err = cs.CoreV1().Pods(ns).Delete(context.TODO(), webserverPod0.Name, metav1.DeleteOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
// assert 5 times that the first pause pod can connect to the Service locally and the second one errors with a timeout
|
||||
nodeIPs0 := e2enode.GetAddresses(&node0, v1.NodeInternalIP)
|
||||
nodeIPs1 := e2enode.GetAddresses(&node1, v1.NodeInternalIP)
|
||||
nodePortAddress0 := net.JoinHostPort(nodeIPs0[0], strconv.Itoa(int(svc.Spec.Ports[0].NodePort)))
|
||||
nodePortAddress1 := net.JoinHostPort(nodeIPs1[0], strconv.Itoa(int(svc.Spec.Ports[0].NodePort)))
|
||||
for i := 0; i < 5; i++ {
|
||||
// There's a Service Type=NodePort with externalTrafficPolicy=Local,
|
||||
// with a single endpoint (which is terminating) called webserver0 running on node0.
|
||||
// pausePod0 and pausePod1 are on node0 and node1 respectively.
|
||||
// pausePod0 -> node1 node port fails because it's "external" and there are no local endpoints
|
||||
// pausePod1 -> node0 node port succeeds because webserver0 is running on node0
|
||||
// pausePod0 -> node0 and pausePod1 -> node1 both succeed because pod-to-same-node-NodePort
|
||||
// connections are neither internal nor external and always get Cluster traffic policy.
|
||||
cmd := fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, nodePortAddress1)
|
||||
_, err := framework.RunHostCmd(pausePod0.Namespace, pausePod0.Name, cmd)
|
||||
framework.ExpectError(err, "expected error when trying to connect to node port for pausePod0")
|
||||
|
||||
execHostnameTest(*pausePod0, nodePortAddress0, webserverPod0.Name)
|
||||
execHostnameTest(*pausePod1, nodePortAddress0, webserverPod0.Name)
|
||||
execHostnameTest(*pausePod1, nodePortAddress1, webserverPod0.Name)
|
||||
|
||||
time.Sleep(5 * time.Second)
|
||||
}
|
||||
})
|
||||
|
||||
/*
|
||||
Release: v1.18
|
||||
Testname: Find Kubernetes Service in default Namespace
|
||||
|
@ -231,7 +231,7 @@ const (
|
||||
|
||||
func initImageConfigs(list RegistryList) (map[int]Config, map[int]Config) {
|
||||
configs := map[int]Config{}
|
||||
configs[Agnhost] = Config{list.PromoterE2eRegistry, "agnhost", "2.33"}
|
||||
configs[Agnhost] = Config{list.PromoterE2eRegistry, "agnhost", "2.36"}
|
||||
configs[AgnhostPrivate] = Config{list.PrivateRegistry, "agnhost", "2.6"}
|
||||
configs[AuthenticatedAlpine] = Config{list.GcAuthenticatedRegistry, "alpine", "3.7"}
|
||||
configs[AuthenticatedWindowsNanoServer] = Config{list.GcAuthenticatedRegistry, "windows-nanoserver", "v1"}
|
||||
|
Loading…
Reference in New Issue
Block a user