diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 543abcd24f5..3180c92baa6 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -4558,9 +4558,9 @@ COMMIT } } -// Test_EndpointSliceWithTerminatingEndpoints tests that when there are local ready and ready + terminating +// Test_EndpointSliceWithTerminatingEndpointsTrafficPolicyLocal tests that when there are local ready and ready + terminating // endpoints, only the ready endpoints are used. -func Test_EndpointSliceWithTerminatingEndpoints(t *testing.T) { +func Test_EndpointSliceWithTerminatingEndpointsTrafficPolicyLocal(t *testing.T) { tcpProtocol := v1.ProtocolTCP timeout := v1.DefaultClientIPServiceAffinitySeconds service := &v1.Service{ @@ -5106,6 +5106,7 @@ COMMIT :KUBE-SERVICES - [0:0] :KUBE-NODEPORTS - [0:0] :KUBE-POSTROUTING - [0:0] +:KUBE-SEP-EQCHZ7S2PJ72OHAY - [0:0] :KUBE-MARK-MASQ - [0:0] :KUBE-SVC-AQI2S6QIMU7PVVRP - [0:0] :KUBE-SVL-AQI2S6QIMU7PVVRP - [0:0] @@ -5114,8 +5115,12 @@ COMMIT -A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN -A KUBE-POSTROUTING -j MARK --xor-mark 0x4000 -A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE +-A KUBE-SEP-EQCHZ7S2PJ72OHAY -m comment --comment ns1/svc1 -s 10.0.1.5 -j KUBE-MARK-MASQ +-A KUBE-SEP-EQCHZ7S2PJ72OHAY -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-EQCHZ7S2PJ72OHAY --set -m tcp -p tcp -j DNAT --to-destination 10.0.1.5:80 -A KUBE-MARK-MASQ -j MARK --or-mark 0x4000 -A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-EQCHZ7S2PJ72OHAY --rcheck --seconds 10800 --reap -j KUBE-SEP-EQCHZ7S2PJ72OHAY +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-EQCHZ7S2PJ72OHAY -A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 -j KUBE-SVC-AQI2S6QIMU7PVVRP -A KUBE-SERVICES -m comment --comment "ns1/svc1 loadbalancer IP" -m tcp -p tcp -d 1.2.3.4 --dport 80 -j KUBE-FW-AQI2S6QIMU7PVVRP -A KUBE-FW-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 loadbalancer IP" -j KUBE-XLB-AQI2S6QIMU7PVVRP @@ -5224,6 +5229,622 @@ COMMIT } } +// Test_EndpointSliceWithTerminatingEndpointsTrafficPolicyCluster tests that when there are cluster-wide ready and ready + terminating +// endpoints, only the ready endpoints are used. +func Test_EndpointSliceWithTerminatingEndpointsTrafficPolicyCluster(t *testing.T) { + tcpProtocol := v1.ProtocolTCP + timeout := v1.DefaultClientIPServiceAffinitySeconds + service := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "svc1", Namespace: "ns1"}, + Spec: v1.ServiceSpec{ + ClusterIP: "172.30.1.1", + Type: v1.ServiceTypeLoadBalancer, + ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeCluster, + Selector: map[string]string{"foo": "bar"}, + Ports: []v1.ServicePort{ + { + Name: "", + TargetPort: intstr.FromInt(80), + Port: 80, + Protocol: v1.ProtocolTCP, + }, + }, + HealthCheckNodePort: 30000, + SessionAffinity: v1.ServiceAffinityClientIP, + SessionAffinityConfig: &v1.SessionAffinityConfig{ + ClientIP: &v1.ClientIPConfig{ + TimeoutSeconds: &timeout, + }, + }, + }, + Status: v1.ServiceStatus{ + LoadBalancer: v1.LoadBalancerStatus{ + Ingress: []v1.LoadBalancerIngress{ + {IP: "1.2.3.4"}, + }, + }, + }, + } + + testcases := []struct { + name string + terminatingFeatureGate bool + endpointslice *discovery.EndpointSlice + expectedIPTables string + noUsableEndpoints bool + }{ + { + name: "feature gate ProxyTerminatingEndpoints enabled, ready endpoints exist", + terminatingFeatureGate: true, + endpointslice: &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-1", "svc1"), + Namespace: "ns1", + Labels: map[string]string{discovery.LabelServiceName: "svc1"}, + }, + Ports: []discovery.EndpointPort{{ + Name: utilpointer.StringPtr(""), + Port: utilpointer.Int32Ptr(80), + Protocol: &tcpProtocol, + }}, + AddressType: discovery.AddressTypeIPv4, + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{"10.0.1.1"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(true), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(false), + }, + NodeName: utilpointer.StringPtr(testHostname), + }, + { + Addresses: []string{"10.0.1.2"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(true), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(false), + }, + NodeName: utilpointer.StringPtr(testHostname), + }, + { + // this endpoint should be ignored since there are ready non-terminating endpoints + Addresses: []string{"10.0.1.3"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(true), + }, + NodeName: utilpointer.StringPtr("another-host"), + }, + { + // this endpoint should be ignored since it is not "serving" + Addresses: []string{"10.0.1.4"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(false), + Terminating: utilpointer.BoolPtr(true), + }, + NodeName: utilpointer.StringPtr("another-host"), + }, + { + Addresses: []string{"10.0.1.5"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(true), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(false), + }, + NodeName: utilpointer.StringPtr("another-host"), + }, + }, + }, + expectedIPTables: ` +*filter +:KUBE-SERVICES - [0:0] +:KUBE-EXTERNAL-SERVICES - [0:0] +:KUBE-FORWARD - [0:0] +:KUBE-NODEPORTS - [0:0] +-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP +-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT +-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT +COMMIT +*nat +:KUBE-SERVICES - [0:0] +:KUBE-NODEPORTS - [0:0] +:KUBE-POSTROUTING - [0:0] +:KUBE-MARK-MASQ - [0:0] +:KUBE-SVC-AQI2S6QIMU7PVVRP - [0:0] +:KUBE-FW-AQI2S6QIMU7PVVRP - [0:0] +:KUBE-SEP-3JOIVZTXZZRGORX4 - [0:0] +:KUBE-SEP-IO5XOSKPAXIFQXAJ - [0:0] +:KUBE-SEP-EQCHZ7S2PJ72OHAY - [0:0] +-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN +-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000 +-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE +-A KUBE-MARK-MASQ -j MARK --or-mark 0x4000 +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ +-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 -j KUBE-SVC-AQI2S6QIMU7PVVRP +-A KUBE-SERVICES -m comment --comment "ns1/svc1 loadbalancer IP" -m tcp -p tcp -d 1.2.3.4 --dport 80 -j KUBE-FW-AQI2S6QIMU7PVVRP +-A KUBE-FW-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 loadbalancer IP" -j KUBE-MARK-MASQ +-A KUBE-FW-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 loadbalancer IP" -j KUBE-SVC-AQI2S6QIMU7PVVRP +-A KUBE-FW-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 loadbalancer IP" -j KUBE-MARK-DROP +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-3JOIVZTXZZRGORX4 --rcheck --seconds 10800 --reap -j KUBE-SEP-3JOIVZTXZZRGORX4 +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-IO5XOSKPAXIFQXAJ --rcheck --seconds 10800 --reap -j KUBE-SEP-IO5XOSKPAXIFQXAJ +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-EQCHZ7S2PJ72OHAY --rcheck --seconds 10800 --reap -j KUBE-SEP-EQCHZ7S2PJ72OHAY +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.3333333333 -j KUBE-SEP-3JOIVZTXZZRGORX4 +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-IO5XOSKPAXIFQXAJ +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-EQCHZ7S2PJ72OHAY +-A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -s 10.0.1.1 -j KUBE-MARK-MASQ +-A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-3JOIVZTXZZRGORX4 --set -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80 +-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -s 10.0.1.2 -j KUBE-MARK-MASQ +-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-IO5XOSKPAXIFQXAJ --set -m tcp -p tcp -j DNAT --to-destination 10.0.1.2:80 +-A KUBE-SEP-EQCHZ7S2PJ72OHAY -m comment --comment ns1/svc1 -s 10.0.1.5 -j KUBE-MARK-MASQ +-A KUBE-SEP-EQCHZ7S2PJ72OHAY -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-EQCHZ7S2PJ72OHAY --set -m tcp -p tcp -j DNAT --to-destination 10.0.1.5:80 +-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS +COMMIT +`, + }, + { + name: "feature gate ProxyTerminatingEndpoints disabled, ready endpoints exist", + terminatingFeatureGate: false, + endpointslice: &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-1", "svc1"), + Namespace: "ns1", + Labels: map[string]string{discovery.LabelServiceName: "svc1"}, + }, + Ports: []discovery.EndpointPort{{ + Name: utilpointer.StringPtr(""), + Port: utilpointer.Int32Ptr(80), + Protocol: &tcpProtocol, + }}, + AddressType: discovery.AddressTypeIPv4, + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{"10.0.1.1"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(true), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(false), + }, + NodeName: utilpointer.StringPtr(testHostname), + }, + { + Addresses: []string{"10.0.1.2"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(true), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(false), + }, + NodeName: utilpointer.StringPtr(testHostname), + }, + { + // always ignored since feature gate is disabled + Addresses: []string{"10.0.1.3"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(true), + }, + NodeName: utilpointer.StringPtr("another-host"), + }, + { + // always ignored since serving=false + Addresses: []string{"10.0.1.4"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(false), + Terminating: utilpointer.BoolPtr(true), + }, + NodeName: utilpointer.StringPtr("another-host"), + }, + { + Addresses: []string{"10.0.1.5"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(true), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(false), + }, + NodeName: utilpointer.StringPtr("another-host"), + }, + }, + }, + expectedIPTables: ` +*filter +:KUBE-SERVICES - [0:0] +:KUBE-EXTERNAL-SERVICES - [0:0] +:KUBE-FORWARD - [0:0] +:KUBE-NODEPORTS - [0:0] +-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP +-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT +-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT +COMMIT +*nat +:KUBE-SERVICES - [0:0] +:KUBE-NODEPORTS - [0:0] +:KUBE-POSTROUTING - [0:0] +:KUBE-MARK-MASQ - [0:0] +:KUBE-SVC-AQI2S6QIMU7PVVRP - [0:0] +:KUBE-FW-AQI2S6QIMU7PVVRP - [0:0] +:KUBE-SEP-3JOIVZTXZZRGORX4 - [0:0] +:KUBE-SEP-IO5XOSKPAXIFQXAJ - [0:0] +:KUBE-SEP-EQCHZ7S2PJ72OHAY - [0:0] +-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN +-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000 +-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE +-A KUBE-MARK-MASQ -j MARK --or-mark 0x4000 +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ +-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 -j KUBE-SVC-AQI2S6QIMU7PVVRP +-A KUBE-SERVICES -m comment --comment "ns1/svc1 loadbalancer IP" -m tcp -p tcp -d 1.2.3.4 --dport 80 -j KUBE-FW-AQI2S6QIMU7PVVRP +-A KUBE-FW-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 loadbalancer IP" -j KUBE-MARK-MASQ +-A KUBE-FW-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 loadbalancer IP" -j KUBE-SVC-AQI2S6QIMU7PVVRP +-A KUBE-FW-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 loadbalancer IP" -j KUBE-MARK-DROP +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-3JOIVZTXZZRGORX4 --rcheck --seconds 10800 --reap -j KUBE-SEP-3JOIVZTXZZRGORX4 +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-IO5XOSKPAXIFQXAJ --rcheck --seconds 10800 --reap -j KUBE-SEP-IO5XOSKPAXIFQXAJ +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-EQCHZ7S2PJ72OHAY --rcheck --seconds 10800 --reap -j KUBE-SEP-EQCHZ7S2PJ72OHAY +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.3333333333 -j KUBE-SEP-3JOIVZTXZZRGORX4 +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-IO5XOSKPAXIFQXAJ +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-EQCHZ7S2PJ72OHAY +-A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -s 10.0.1.1 -j KUBE-MARK-MASQ +-A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-3JOIVZTXZZRGORX4 --set -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80 +-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -s 10.0.1.2 -j KUBE-MARK-MASQ +-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-IO5XOSKPAXIFQXAJ --set -m tcp -p tcp -j DNAT --to-destination 10.0.1.2:80 +-A KUBE-SEP-EQCHZ7S2PJ72OHAY -m comment --comment ns1/svc1 -s 10.0.1.5 -j KUBE-MARK-MASQ +-A KUBE-SEP-EQCHZ7S2PJ72OHAY -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-EQCHZ7S2PJ72OHAY --set -m tcp -p tcp -j DNAT --to-destination 10.0.1.5:80 +-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS +COMMIT +`, + }, + { + name: "feature gate ProxyTerminatingEndpoints enabled, only terminating endpoints exist", + terminatingFeatureGate: true, + endpointslice: &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-1", "svc1"), + Namespace: "ns1", + Labels: map[string]string{discovery.LabelServiceName: "svc1"}, + }, + Ports: []discovery.EndpointPort{{ + Name: utilpointer.StringPtr(""), + Port: utilpointer.Int32Ptr(80), + Protocol: &tcpProtocol, + }}, + AddressType: discovery.AddressTypeIPv4, + Endpoints: []discovery.Endpoint{ + { + // this endpoint should be used since there are only ready terminating endpoints + Addresses: []string{"10.0.1.2"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(true), + }, + NodeName: utilpointer.StringPtr(testHostname), + }, + { + // this endpoint should be used since there are only ready terminating endpoints + Addresses: []string{"10.0.1.3"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(true), + }, + NodeName: utilpointer.StringPtr(testHostname), + }, + { + // this endpoint should not be used since it is both terminating and not ready. + Addresses: []string{"10.0.1.4"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(false), + Terminating: utilpointer.BoolPtr(true), + }, + NodeName: utilpointer.StringPtr("another-host"), + }, + { + // this endpoint should be used since there are only ready terminating endpoints + Addresses: []string{"10.0.1.5"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(true), + }, + NodeName: utilpointer.StringPtr("another-host"), + }, + }, + }, + expectedIPTables: ` +*filter +:KUBE-SERVICES - [0:0] +:KUBE-EXTERNAL-SERVICES - [0:0] +:KUBE-FORWARD - [0:0] +:KUBE-NODEPORTS - [0:0] +-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP +-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT +-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT +COMMIT +*nat +:KUBE-SERVICES - [0:0] +:KUBE-NODEPORTS - [0:0] +:KUBE-POSTROUTING - [0:0] +:KUBE-MARK-MASQ - [0:0] +:KUBE-SVC-AQI2S6QIMU7PVVRP - [0:0] +:KUBE-FW-AQI2S6QIMU7PVVRP - [0:0] +:KUBE-SEP-EQCHZ7S2PJ72OHAY - [0:0] +:KUBE-SEP-IO5XOSKPAXIFQXAJ - [0:0] +:KUBE-SEP-XGJFVO3L2O5SRFNT - [0:0] +-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN +-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000 +-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE +-A KUBE-MARK-MASQ -j MARK --or-mark 0x4000 +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ +-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 -j KUBE-SVC-AQI2S6QIMU7PVVRP +-A KUBE-SERVICES -m comment --comment "ns1/svc1 loadbalancer IP" -m tcp -p tcp -d 1.2.3.4 --dport 80 -j KUBE-FW-AQI2S6QIMU7PVVRP +-A KUBE-FW-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 loadbalancer IP" -j KUBE-MARK-MASQ +-A KUBE-FW-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 loadbalancer IP" -j KUBE-SVC-AQI2S6QIMU7PVVRP +-A KUBE-FW-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 loadbalancer IP" -j KUBE-MARK-DROP +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-IO5XOSKPAXIFQXAJ --rcheck --seconds 10800 --reap -j KUBE-SEP-IO5XOSKPAXIFQXAJ +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-XGJFVO3L2O5SRFNT --rcheck --seconds 10800 --reap -j KUBE-SEP-XGJFVO3L2O5SRFNT +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-EQCHZ7S2PJ72OHAY --rcheck --seconds 10800 --reap -j KUBE-SEP-EQCHZ7S2PJ72OHAY +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.3333333333 -j KUBE-SEP-IO5XOSKPAXIFQXAJ +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-XGJFVO3L2O5SRFNT +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-EQCHZ7S2PJ72OHAY +-A KUBE-SEP-EQCHZ7S2PJ72OHAY -m comment --comment ns1/svc1 -s 10.0.1.5 -j KUBE-MARK-MASQ +-A KUBE-SEP-EQCHZ7S2PJ72OHAY -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-EQCHZ7S2PJ72OHAY --set -m tcp -p tcp -j DNAT --to-destination 10.0.1.5:80 +-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -s 10.0.1.2 -j KUBE-MARK-MASQ +-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-IO5XOSKPAXIFQXAJ --set -m tcp -p tcp -j DNAT --to-destination 10.0.1.2:80 +-A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -s 10.0.1.3 -j KUBE-MARK-MASQ +-A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-XGJFVO3L2O5SRFNT --set -m tcp -p tcp -j DNAT --to-destination 10.0.1.3:80 +-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS +COMMIT +`, + }, + { + name: "with ProxyTerminatingEndpoints disabled, only terminating endpoints exist", + terminatingFeatureGate: false, + endpointslice: &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-1", "svc1"), + Namespace: "ns1", + Labels: map[string]string{discovery.LabelServiceName: "svc1"}, + }, + Ports: []discovery.EndpointPort{{ + Name: utilpointer.StringPtr(""), + Port: utilpointer.Int32Ptr(80), + Protocol: &tcpProtocol, + }}, + AddressType: discovery.AddressTypeIPv4, + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{"10.0.1.1"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(true), + }, + NodeName: utilpointer.StringPtr(testHostname), + }, + { + Addresses: []string{"10.0.1.2"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(true), + }, + NodeName: utilpointer.StringPtr(testHostname), + }, + { + Addresses: []string{"10.0.1.3"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(false), + Terminating: utilpointer.BoolPtr(true), + }, + NodeName: utilpointer.StringPtr("another-host"), + }, + { + Addresses: []string{"10.0.1.4"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(false), + Terminating: utilpointer.BoolPtr(true), + }, + NodeName: utilpointer.StringPtr("another-host"), + }, + { + Addresses: []string{"10.0.1.5"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(true), + }, + NodeName: utilpointer.StringPtr("another-host"), + }, + }, + }, + noUsableEndpoints: true, + expectedIPTables: ` +*filter +:KUBE-SERVICES - [0:0] +:KUBE-EXTERNAL-SERVICES - [0:0] +:KUBE-FORWARD - [0:0] +:KUBE-NODEPORTS - [0:0] +-A KUBE-SERVICES -m comment --comment "ns1/svc1 has no endpoints" -m tcp -p tcp -d 172.30.1.1 --dport 80 -j REJECT +-A KUBE-EXTERNAL-SERVICES -m comment --comment "ns1/svc1 has no endpoints" -m tcp -p tcp -d 1.2.3.4 --dport 80 -j REJECT +-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP +-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT +-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT +COMMIT +*nat +:KUBE-SERVICES - [0:0] +:KUBE-NODEPORTS - [0:0] +:KUBE-POSTROUTING - [0:0] +:KUBE-MARK-MASQ - [0:0] +-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN +-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000 +-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE +-A KUBE-MARK-MASQ -j MARK --or-mark 0x4000 +-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS +COMMIT +`, + }, + { + name: "ProxyTerminatingEndpoints enabled, terminating endpoints on remote node", + terminatingFeatureGate: true, + endpointslice: &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-1", "svc1"), + Namespace: "ns1", + Labels: map[string]string{discovery.LabelServiceName: "svc1"}, + }, + Ports: []discovery.EndpointPort{{ + Name: utilpointer.StringPtr(""), + Port: utilpointer.Int32Ptr(80), + Protocol: &tcpProtocol, + }}, + AddressType: discovery.AddressTypeIPv4, + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{"10.0.1.5"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(true), + }, + NodeName: utilpointer.StringPtr("host-1"), + }, + }, + }, + expectedIPTables: ` +*filter +:KUBE-SERVICES - [0:0] +:KUBE-EXTERNAL-SERVICES - [0:0] +:KUBE-FORWARD - [0:0] +:KUBE-NODEPORTS - [0:0] +-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP +-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT +-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT +COMMIT +*nat +:KUBE-SERVICES - [0:0] +:KUBE-NODEPORTS - [0:0] +:KUBE-POSTROUTING - [0:0] +:KUBE-MARK-MASQ - [0:0] +:KUBE-SEP-EQCHZ7S2PJ72OHAY - [0:0] +:KUBE-SVC-AQI2S6QIMU7PVVRP - [0:0] +:KUBE-FW-AQI2S6QIMU7PVVRP - [0:0] +-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN +-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000 +-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE +-A KUBE-MARK-MASQ -j MARK --or-mark 0x4000 +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-EQCHZ7S2PJ72OHAY --rcheck --seconds 10800 --reap -j KUBE-SEP-EQCHZ7S2PJ72OHAY +-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-EQCHZ7S2PJ72OHAY +-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 -j KUBE-SVC-AQI2S6QIMU7PVVRP +-A KUBE-SERVICES -m comment --comment "ns1/svc1 loadbalancer IP" -m tcp -p tcp -d 1.2.3.4 --dport 80 -j KUBE-FW-AQI2S6QIMU7PVVRP +-A KUBE-SEP-EQCHZ7S2PJ72OHAY -m comment --comment ns1/svc1 -s 10.0.1.5 -j KUBE-MARK-MASQ +-A KUBE-SEP-EQCHZ7S2PJ72OHAY -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-EQCHZ7S2PJ72OHAY --set -m tcp -p tcp -j DNAT --to-destination 10.0.1.5:80 +-A KUBE-FW-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 loadbalancer IP" -j KUBE-MARK-MASQ +-A KUBE-FW-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 loadbalancer IP" -j KUBE-SVC-AQI2S6QIMU7PVVRP +-A KUBE-FW-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 loadbalancer IP" -j KUBE-MARK-DROP +-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS +COMMIT +`, + }, + { + name: "no usable endpoints on any node", + terminatingFeatureGate: true, + endpointslice: &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-1", "svc1"), + Namespace: "ns1", + Labels: map[string]string{discovery.LabelServiceName: "svc1"}, + }, + Ports: []discovery.EndpointPort{{ + Name: utilpointer.StringPtr(""), + Port: utilpointer.Int32Ptr(80), + Protocol: &tcpProtocol, + }}, + AddressType: discovery.AddressTypeIPv4, + Endpoints: []discovery.Endpoint{ + { + // Local, not ready or serving + Addresses: []string{"10.0.1.5"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(false), + Terminating: utilpointer.BoolPtr(true), + }, + NodeName: utilpointer.StringPtr(testHostname), + }, + { + // Remote, not ready or serving + Addresses: []string{"10.0.1.5"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(false), + Terminating: utilpointer.BoolPtr(true), + }, + NodeName: utilpointer.StringPtr("host-1"), + }, + }, + }, + noUsableEndpoints: true, + expectedIPTables: ` +*filter +:KUBE-SERVICES - [0:0] +:KUBE-EXTERNAL-SERVICES - [0:0] +:KUBE-FORWARD - [0:0] +:KUBE-NODEPORTS - [0:0] +-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP +-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT +-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT +-A KUBE-SERVICES -m comment --comment "ns1/svc1 has no endpoints" -m tcp -p tcp -d 172.30.1.1 --dport 80 -j REJECT +-A KUBE-EXTERNAL-SERVICES -m comment --comment "ns1/svc1 has no endpoints" -m tcp -p tcp -d 1.2.3.4 --dport 80 -j REJECT +COMMIT +*nat +:KUBE-SERVICES - [0:0] +:KUBE-NODEPORTS - [0:0] +:KUBE-POSTROUTING - [0:0] +:KUBE-MARK-MASQ - [0:0] +-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN +-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000 +-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE +-A KUBE-MARK-MASQ -j MARK --or-mark 0x4000 +-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS +COMMIT +`, + }, + } + + for _, testcase := range testcases { + t.Run(testcase.name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ProxyTerminatingEndpoints, testcase.terminatingFeatureGate)() + + ipt := iptablestest.NewFake() + fp := NewFakeProxier(ipt) + fp.OnServiceSynced() + fp.OnEndpointSlicesSynced() + + fp.OnServiceAdd(service) + + fp.OnEndpointSliceAdd(testcase.endpointslice) + fp.syncProxyRules() + t.Log(fp.iptablesData.String()) + assertIPTablesRulesEqual(t, testcase.expectedIPTables, fp.iptablesData.String()) + + fp.OnEndpointSliceDelete(testcase.endpointslice) + fp.syncProxyRules() + if testcase.noUsableEndpoints { + // Deleting the EndpointSlice should have had no effect + assertIPTablesRulesEqual(t, testcase.expectedIPTables, fp.iptablesData.String()) + } else { + assertIPTablesRulesNotEqual(t, testcase.expectedIPTables, fp.iptablesData.String()) + } + }) + } +} + func TestMasqueradeAll(t *testing.T) { ipt := iptablestest.NewFake() fp := NewFakeProxier(ipt) diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 33d8a7e7e40..210a00cdf94 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -4835,6 +4835,183 @@ func TestTestInternalTrafficPolicyE2E(t *testing.T) { } } +// Test_EndpointSliceReadyAndTerminatingCluster tests that when there are ready and ready + terminating +// endpoints and the traffic policy is "Cluster", only the ready endpoints are used. +func Test_EndpointSliceReadyAndTerminatingCluster(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ProxyTerminatingEndpoints, true)() + + ipt := iptablestest.NewFake() + ipvs := ipvstest.NewFake() + ipset := ipsettest.NewFake(testIPSetVersion) + fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol) + fp.servicesSynced = true + // fp.endpointsSynced = true + fp.endpointSlicesSynced = true + + clusterInternalTrafficPolicy := v1.ServiceInternalTrafficPolicyCluster + + serviceName := "svc1" + // Add initial service + namespaceName := "ns1" + fp.OnServiceAdd(&v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName}, + Spec: v1.ServiceSpec{ + ClusterIP: "172.20.1.1", + Selector: map[string]string{"foo": "bar"}, + Type: v1.ServiceTypeNodePort, + ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeCluster, + InternalTrafficPolicy: &clusterInternalTrafficPolicy, + ExternalIPs: []string{ + "1.2.3.4", + }, + Ports: []v1.ServicePort{ + { + Name: "", + Port: 80, + TargetPort: intstr.FromInt(80), + Protocol: v1.ProtocolTCP, + }, + }, + }, + }) + + // Add initial endpoint slice + tcpProtocol := v1.ProtocolTCP + endpointSlice := &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-1", serviceName), + Namespace: namespaceName, + Labels: map[string]string{discovery.LabelServiceName: serviceName}, + }, + Ports: []discovery.EndpointPort{{ + Name: utilpointer.StringPtr(""), + Port: utilpointer.Int32Ptr(80), + Protocol: &tcpProtocol, + }}, + AddressType: discovery.AddressTypeIPv4, + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{"10.0.1.1"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(true), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(false), + }, + NodeName: utilpointer.StringPtr(testHostname), + }, + { + Addresses: []string{"10.0.1.2"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(true), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(false), + }, + NodeName: utilpointer.StringPtr(testHostname), + }, + { + Addresses: []string{"10.0.1.3"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(true), + }, + NodeName: utilpointer.StringPtr(testHostname), + }, + { + Addresses: []string{"10.0.1.4"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(false), + Terminating: utilpointer.BoolPtr(true), + }, + NodeName: utilpointer.StringPtr(testHostname), + }, + { + Addresses: []string{"10.0.1.5"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(true), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(false), + }, + NodeName: utilpointer.StringPtr("another-host"), + }, + }, + } + + fp.OnEndpointSliceAdd(endpointSlice) + fp.syncProxyRules() + + // Ensure that Proxier updates ipvs appropriately after EndpointSlice update + assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"]) + activeEntries1 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries + assert.Equal(t, 4, activeEntries1.Len(), "Expected 4 active entry in KUBE-LOOP-BACK") + assert.Equal(t, true, activeEntries1.Has("10.0.1.1,tcp:80,10.0.1.1"), "Expected activeEntries to reference first pod") + assert.Equal(t, true, activeEntries1.Has("10.0.1.2,tcp:80,10.0.1.2"), "Expected activeEntries to reference second pod") + assert.Equal(t, true, activeEntries1.Has("10.0.1.3,tcp:80,10.0.1.3"), "Expected activeEntries to reference third pod") + assert.Equal(t, true, activeEntries1.Has("10.0.1.4,tcp:80,10.0.1.4"), "Expected activeEntries to reference fourth pod") + + virtualServers, vsErr := ipvs.GetVirtualServers() + assert.Nil(t, vsErr, "Expected no error getting virtual servers") + assert.Len(t, virtualServers, 2, "Expected 2 virtual server") + + var clusterIPServer, externalIPServer *utilipvs.VirtualServer + for _, virtualServer := range virtualServers { + if virtualServer.Address.String() == "172.20.1.1" { + clusterIPServer = virtualServer + } + + if virtualServer.Address.String() == "1.2.3.4" { + externalIPServer = virtualServer + } + } + + // clusterIP should route to cluster-wide ready endpoints + realServers1, rsErr1 := ipvs.GetRealServers(clusterIPServer) + assert.Nil(t, rsErr1, "Expected no error getting real servers") + assert.Len(t, realServers1, 3, "Expected 3 real servers") + assert.Equal(t, realServers1[0].String(), "10.0.1.1:80") + assert.Equal(t, realServers1[1].String(), "10.0.1.2:80") + assert.Equal(t, realServers1[2].String(), "10.0.1.5:80") + + // externalIP should route to cluster-wide ready endpoints + realServers2, rsErr2 := ipvs.GetRealServers(externalIPServer) + assert.Nil(t, rsErr2, "Expected no error getting real servers") + assert.Len(t, realServers2, 3, "Expected 3 real servers") + assert.Equal(t, realServers2[0].String(), "10.0.1.1:80") + assert.Equal(t, realServers2[1].String(), "10.0.1.2:80") + assert.Equal(t, realServers1[2].String(), "10.0.1.5:80") + + fp.OnEndpointSliceDelete(endpointSlice) + fp.syncProxyRules() + + // Ensure that Proxier updates ipvs appropriately after EndpointSlice delete + assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"]) + activeEntries2 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries + assert.Equal(t, 0, activeEntries2.Len(), "Expected 0 active entries in KUBE-LOOP-BACK") + + virtualServers, vsErr = ipvs.GetVirtualServers() + assert.Nil(t, vsErr, "Expected no error getting virtual servers") + assert.Len(t, virtualServers, 2, "Expected 1 virtual server") + + for _, virtualServer := range virtualServers { + if virtualServer.Address.String() == "172.20.1.1" { + clusterIPServer = virtualServer + } + + if virtualServer.Address.String() == "1.2.3.4" { + externalIPServer = virtualServer + } + } + + realServers1, rsErr1 = ipvs.GetRealServers(clusterIPServer) + assert.Nil(t, rsErr1, "Expected no error getting real servers") + assert.Len(t, realServers1, 0, "Expected 0 real servers") + + realServers2, rsErr2 = ipvs.GetRealServers(externalIPServer) + assert.Nil(t, rsErr2, "Expected no error getting real servers") + assert.Len(t, realServers2, 0, "Expected 0 real servers") +} + // Test_EndpointSliceReadyAndTerminatingLocal tests that when there are local ready and ready + terminating // endpoints, only the ready endpoints are used. func Test_EndpointSliceReadyAndTerminatingLocal(t *testing.T) { @@ -5011,6 +5188,182 @@ func Test_EndpointSliceReadyAndTerminatingLocal(t *testing.T) { assert.Len(t, realServers2, 0, "Expected 0 real servers") } +// Test_EndpointSliceOnlyReadyTerminatingCluster tests that when there are only ready terminating +// endpoints and the traffic policy is "Cluster", we fall back to terminating endpoints. +func Test_EndpointSliceOnlyReadyAndTerminatingCluster(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ProxyTerminatingEndpoints, true)() + + ipt := iptablestest.NewFake() + ipvs := ipvstest.NewFake() + ipset := ipsettest.NewFake(testIPSetVersion) + fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol) + fp.servicesSynced = true + // fp.endpointsSynced = true + fp.endpointSlicesSynced = true + + clusterInternalTrafficPolicy := v1.ServiceInternalTrafficPolicyCluster + + // Add initial service + serviceName := "svc1" + namespaceName := "ns1" + fp.OnServiceAdd(&v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName}, + Spec: v1.ServiceSpec{ + ClusterIP: "172.20.1.1", + Selector: map[string]string{"foo": "bar"}, + Type: v1.ServiceTypeNodePort, + ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeCluster, + InternalTrafficPolicy: &clusterInternalTrafficPolicy, + ExternalIPs: []string{ + "1.2.3.4", + }, + Ports: []v1.ServicePort{ + { + Name: "", + Port: 80, + TargetPort: intstr.FromInt(80), + Protocol: v1.ProtocolTCP, + }, + }, + }, + }) + + // Add initial endpoint slice + tcpProtocol := v1.ProtocolTCP + endpointSlice := &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-1", serviceName), + Namespace: namespaceName, + Labels: map[string]string{discovery.LabelServiceName: serviceName}, + }, + Ports: []discovery.EndpointPort{{ + Name: utilpointer.StringPtr(""), + Port: utilpointer.Int32Ptr(80), + Protocol: &tcpProtocol, + }}, + AddressType: discovery.AddressTypeIPv4, + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{"10.0.1.1"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(true), + }, + NodeName: utilpointer.StringPtr(testHostname), + }, + { + Addresses: []string{"10.0.1.2"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(true), + }, + NodeName: utilpointer.StringPtr(testHostname), + }, + { + Addresses: []string{"10.0.1.3"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(false), + Terminating: utilpointer.BoolPtr(true), + }, + NodeName: utilpointer.StringPtr(testHostname), + }, + { + Addresses: []string{"10.0.1.4"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(true), + }, + NodeName: utilpointer.StringPtr("another-host"), + }, + { + Addresses: []string{"10.0.1.5"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(false), + Terminating: utilpointer.BoolPtr(false), + }, + NodeName: utilpointer.StringPtr("another-host"), + }, + }, + } + + fp.OnEndpointSliceAdd(endpointSlice) + fp.syncProxyRules() + + // Ensure that Proxier updates ipvs appropriately after EndpointSlice update + assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"]) + activeEntries1 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries + assert.Equal(t, 3, activeEntries1.Len(), "Expected 3 active entry in KUBE-LOOP-BACK") + assert.Equal(t, true, activeEntries1.Has("10.0.1.1,tcp:80,10.0.1.1"), "Expected activeEntries to reference first (local) pod") + assert.Equal(t, true, activeEntries1.Has("10.0.1.2,tcp:80,10.0.1.2"), "Expected activeEntries to reference second (local) pod") + assert.Equal(t, true, activeEntries1.Has("10.0.1.3,tcp:80,10.0.1.3"), "Expected activeEntries to reference second (local) pod") + + virtualServers, vsErr := ipvs.GetVirtualServers() + assert.Nil(t, vsErr, "Expected no error getting virtual servers") + assert.Len(t, virtualServers, 2, "Expected 2 virtual server") + + var clusterIPServer, externalIPServer *utilipvs.VirtualServer + for _, virtualServer := range virtualServers { + if virtualServer.Address.String() == "172.20.1.1" { + clusterIPServer = virtualServer + } + + if virtualServer.Address.String() == "1.2.3.4" { + externalIPServer = virtualServer + } + } + + // clusterIP should fall back to cluster-wide ready + terminating endpoints + realServers1, rsErr1 := ipvs.GetRealServers(clusterIPServer) + assert.Nil(t, rsErr1, "Expected no error getting real servers") + assert.Len(t, realServers1, 3, "Expected 1 real servers") + assert.Equal(t, realServers1[0].String(), "10.0.1.1:80") + assert.Equal(t, realServers1[1].String(), "10.0.1.2:80") + assert.Equal(t, realServers1[2].String(), "10.0.1.4:80") + + // externalIP should fall back to ready + terminating endpoints + realServers2, rsErr2 := ipvs.GetRealServers(externalIPServer) + assert.Nil(t, rsErr2, "Expected no error getting real servers") + assert.Len(t, realServers2, 3, "Expected 2 real servers") + assert.Equal(t, realServers2[0].String(), "10.0.1.1:80") + assert.Equal(t, realServers2[1].String(), "10.0.1.2:80") + assert.Equal(t, realServers2[2].String(), "10.0.1.4:80") + + fp.OnEndpointSliceDelete(endpointSlice) + fp.syncProxyRules() + + // Ensure that Proxier updates ipvs appropriately after EndpointSlice delete + assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"]) + activeEntries2 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries + assert.Equal(t, 0, activeEntries2.Len(), "Expected 0 active entries in KUBE-LOOP-BACK") + + virtualServers, vsErr = ipvs.GetVirtualServers() + assert.Nil(t, vsErr, "Expected no error getting virtual servers") + assert.Len(t, virtualServers, 2, "Expected 1 virtual server") + + for _, virtualServer := range virtualServers { + if virtualServer.Address.String() == "172.20.1.1" { + clusterIPServer = virtualServer + } + + if virtualServer.Address.String() == "1.2.3.4" { + externalIPServer = virtualServer + } + } + + realServers1, rsErr1 = ipvs.GetRealServers(clusterIPServer) + assert.Nil(t, rsErr1, "Expected no error getting real servers") + assert.Len(t, realServers1, 0, "Expected 0 real servers") + + realServers2, rsErr2 = ipvs.GetRealServers(externalIPServer) + assert.Nil(t, rsErr2, "Expected no error getting real servers") + assert.Len(t, realServers2, 0, "Expected 0 real servers") +} + // Test_EndpointSliceOnlyReadyTerminatingLocal tests that when there are only local ready terminating // endpoints, we fall back to those endpoints. func Test_EndpointSliceOnlyReadyAndTerminatingLocal(t *testing.T) { diff --git a/pkg/proxy/topology.go b/pkg/proxy/topology.go index ef16d559b1c..d68aeb40f94 100644 --- a/pkg/proxy/topology.go +++ b/pkg/proxy/topology.go @@ -52,6 +52,19 @@ func CategorizeEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeLabels m return true }) + // if there are 0 cluster-wide endpoints, we can try to fallback to any terminating endpoints that are ready. + // When falling back to terminating endpoints, we do NOT consider topology aware routing since this is a best + // effort attempt to avoid dropping connections. + if len(clusterEndpoints) == 0 && utilfeature.DefaultFeatureGate.Enabled(features.ProxyTerminatingEndpoints) { + clusterEndpoints = filterEndpoints(endpoints, func(ep Endpoint) bool { + if ep.IsServing() && ep.IsTerminating() { + return true + } + + return false + }) + } + // If there are any Ready endpoints anywhere in the cluster, we are // guaranteed to get one in clusterEndpoints. if len(clusterEndpoints) > 0 { diff --git a/pkg/proxy/topology_test.go b/pkg/proxy/topology_test.go index 35c110815a2..cdab4926ccb 100644 --- a/pkg/proxy/topology_test.go +++ b/pkg/proxy/topology_test.go @@ -350,7 +350,17 @@ func TestCategorizeEndpoints(t *testing.T) { serviceInfo: &BaseServiceInfo{}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: false, Serving: true, Terminating: true, IsLocal: true}, - &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: true, Terminating: true, IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: true, Terminating: true, IsLocal: false}, + }, + clusterEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), + localEndpoints: nil, + }, { + name: "Cluster traffic policy, PTE disabled, all endpoints are terminating", + pteEnabled: false, + serviceInfo: &BaseServiceInfo{}, + endpoints: []Endpoint{ + &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: false, Serving: true, Terminating: true, IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: true, Terminating: true, IsLocal: false}, }, clusterEndpoints: sets.NewString(), localEndpoints: nil, @@ -412,9 +422,9 @@ func TestCategorizeEndpoints(t *testing.T) { &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: false, Serving: true, Terminating: true, IsLocal: false}, &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: true, Terminating: true, IsLocal: false}, }, - clusterEndpoints: sets.NewString(), + clusterEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), localEndpoints: sets.NewString(), - allEndpoints: sets.NewString(), + allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), onlyRemoteEndpoints: true, }, { name: "iTP: Cluster, eTP: Local, PTE disabled, with terminating endpoints", diff --git a/test/e2e/network/service.go b/test/e2e/network/service.go index ab6e712bbd5..e0d7e600984 100644 --- a/test/e2e/network/service.go +++ b/test/e2e/network/service.go @@ -52,6 +52,7 @@ import ( cloudprovider "k8s.io/cloud-provider" netutils "k8s.io/utils/net" + utilpointer "k8s.io/utils/pointer" "k8s.io/kubernetes/test/e2e/framework" e2edeployment "k8s.io/kubernetes/test/e2e/framework/deployment" @@ -2309,6 +2310,456 @@ var _ = common.SIGDescribe("Services", func() { } }) + ginkgo.It("should fail health check node port if there are only terminating endpoints [Feature:ProxyTerminatingEndpoints]", func() { + // windows kube-proxy does not support this feature yet + e2eskipper.SkipIfNodeOSDistroIs("windows") + + // This behavior is not supported if Kube-proxy is in "userspace" mode. + // So we check the kube-proxy mode and skip this test if that's the case. + if proxyMode, err := proxyMode(f); err == nil { + if proxyMode == "userspace" { + e2eskipper.Skipf("The test doesn't work with kube-proxy in userspace mode") + } + } else { + framework.Logf("Couldn't detect KubeProxy mode - test failure may be expected: %v", err) + } + + nodes, err := e2enode.GetBoundedReadySchedulableNodes(cs, 2) + framework.ExpectNoError(err) + nodeCounts := len(nodes.Items) + if nodeCounts < 2 { + e2eskipper.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts) + } + node0 := nodes.Items[0] + + serviceName := "svc-proxy-terminating" + ns := f.Namespace.Name + servicePort := 80 + + ginkgo.By("creating a TCP service " + serviceName + " where all pods are terminating" + ns) + jig := e2eservice.NewTestJig(cs, ns, serviceName) + svc, err := jig.CreateTCPService(func(svc *v1.Service) { + svc.Spec.Ports = []v1.ServicePort{ + {Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt(80)}, + } + svc.Spec.Type = v1.ServiceTypeLoadBalancer + svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal + }) + framework.ExpectNoError(err) + + ginkgo.By("Creating 1 webserver pod to be part of the TCP service") + webserverPod0 := e2epod.NewAgnhostPod(ns, "echo-hostname-0", nil, nil, nil, "netexec", "--http-port", strconv.Itoa(servicePort), "--delay-shutdown", "600") + webserverPod0.Labels = jig.Labels + webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(600) + e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name}) + + _, err = cs.CoreV1().Pods(ns).Create(context.TODO(), webserverPod0, metav1.CreateOptions{}) + framework.ExpectNoError(err) + framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, webserverPod0.Name, f.Namespace.Name, framework.PodStartTimeout)) + validateEndpointsPortsOrFail(cs, ns, serviceName, portsByPodName{webserverPod0.Name: {servicePort}}) + + pausePod0 := e2epod.NewAgnhostPod(ns, "pause-pod-0", nil, nil, nil) + e2epod.SetNodeSelection(&pausePod0.Spec, e2epod.NodeSelection{Name: node0.Name}) + + pausePod0, err = cs.CoreV1().Pods(ns).Create(context.TODO(), pausePod0, metav1.CreateOptions{}) + framework.ExpectNoError(err) + framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, pausePod0.Name, f.Namespace.Name, framework.PodStartTimeout)) + + nodeIPs := e2enode.GetAddresses(&node0, v1.NodeInternalIP) + healthCheckNodePortAddr := net.JoinHostPort(nodeIPs[0], strconv.Itoa(int(svc.Spec.HealthCheckNodePort))) + // validate that the health check node port from kube-proxy returns 200 when there are ready endpoints + err = wait.PollImmediate(time.Second, time.Minute, func() (bool, error) { + cmd := fmt.Sprintf(`curl -s -o /dev/null -w "%%{http_code}" --connect-timeout 5 http://%s/healthz`, healthCheckNodePortAddr) + out, err := framework.RunHostCmd(pausePod0.Namespace, pausePod0.Name, cmd) + if err != nil { + return false, err + } + + expectedOut := "200" + if out != expectedOut { + return false, nil + } + return true, nil + }) + framework.ExpectNoError(err) + + // webserver should continue to serve traffic through the Service after deletion, even though the health check node port should return 503 + ginkgo.By("Terminating the webserver pod") + err = cs.CoreV1().Pods(ns).Delete(context.TODO(), webserverPod0.Name, metav1.DeleteOptions{}) + framework.ExpectNoError(err) + + // validate that the health check node port from kube-proxy returns 503 when there are no ready endpoints + err = wait.PollImmediate(time.Second, time.Minute, func() (bool, error) { + cmd := fmt.Sprintf(`curl -s -o /dev/null -w "%%{http_code}" --connect-timeout 5 http://%s/healthz`, healthCheckNodePortAddr) + out, err := framework.RunHostCmd(pausePod0.Namespace, pausePod0.Name, cmd) + if err != nil { + return false, err + } + + expectedOut := "503" + if out != expectedOut { + return false, nil + } + return true, nil + }) + framework.ExpectNoError(err) + + // also verify that while health check node port indicates 0 endpoints and returns 503, the endpoint still serves traffic. + nodePortAddress := net.JoinHostPort(nodeIPs[0], strconv.Itoa(int(svc.Spec.Ports[0].NodePort))) + execHostnameTest(*pausePod0, nodePortAddress, webserverPod0.Name) + }) + + ginkgo.It("should fallback to terminating endpoints when there are no ready endpoints with internalTrafficPolicy=Cluster [Feature:ProxyTerminatingEndpoints]", func() { + // windows kube-proxy does not support this feature yet + e2eskipper.SkipIfNodeOSDistroIs("windows") + + // This behavior is not supported if Kube-proxy is in "userspace" mode. + // So we check the kube-proxy mode and skip this test if that's the case. + if proxyMode, err := proxyMode(f); err == nil { + if proxyMode == "userspace" { + e2eskipper.Skipf("The test doesn't work with kube-proxy in userspace mode") + } + } else { + framework.Logf("Couldn't detect KubeProxy mode - test failure may be expected: %v", err) + } + + nodes, err := e2enode.GetBoundedReadySchedulableNodes(cs, 2) + framework.ExpectNoError(err) + nodeCounts := len(nodes.Items) + if nodeCounts < 2 { + e2eskipper.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts) + } + node0 := nodes.Items[0] + node1 := nodes.Items[1] + + serviceName := "svc-proxy-terminating" + ns := f.Namespace.Name + servicePort := 80 + + ginkgo.By("creating a TCP service " + serviceName + " where all pods are terminating" + ns) + jig := e2eservice.NewTestJig(cs, ns, serviceName) + svc, err := jig.CreateTCPService(func(svc *v1.Service) { + svc.Spec.Ports = []v1.ServicePort{ + {Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt(80)}, + } + }) + framework.ExpectNoError(err) + + ginkgo.By("Creating 1 webserver pod to be part of the TCP service") + webserverPod0 := e2epod.NewAgnhostPod(ns, "echo-hostname-0", nil, nil, nil, "netexec", "--http-port", strconv.Itoa(servicePort), "--delay-shutdown", "600") + webserverPod0.Labels = jig.Labels + webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(600) + e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name}) + + _, err = cs.CoreV1().Pods(ns).Create(context.TODO(), webserverPod0, metav1.CreateOptions{}) + framework.ExpectNoError(err) + framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, webserverPod0.Name, f.Namespace.Name, framework.PodStartTimeout)) + validateEndpointsPortsOrFail(cs, ns, serviceName, portsByPodName{webserverPod0.Name: {servicePort}}) + + ginkgo.By("Creating 2 pause pods that will try to connect to the webservers") + pausePod0 := e2epod.NewAgnhostPod(ns, "pause-pod-0", nil, nil, nil) + e2epod.SetNodeSelection(&pausePod0.Spec, e2epod.NodeSelection{Name: node0.Name}) + + pausePod0, err = cs.CoreV1().Pods(ns).Create(context.TODO(), pausePod0, metav1.CreateOptions{}) + framework.ExpectNoError(err) + framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, pausePod0.Name, f.Namespace.Name, framework.PodStartTimeout)) + + pausePod1 := e2epod.NewAgnhostPod(ns, "pause-pod-1", nil, nil, nil) + e2epod.SetNodeSelection(&pausePod1.Spec, e2epod.NodeSelection{Name: node1.Name}) + + pausePod1, err = cs.CoreV1().Pods(ns).Create(context.TODO(), pausePod1, metav1.CreateOptions{}) + framework.ExpectNoError(err) + framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, pausePod1.Name, f.Namespace.Name, framework.PodStartTimeout)) + + // webserver should continue to serve traffic through the Service after delete since: + // - it has a 600s termination grace period + // - it is the only ready endpoint + err = cs.CoreV1().Pods(ns).Delete(context.TODO(), webserverPod0.Name, metav1.DeleteOptions{}) + framework.ExpectNoError(err) + + // assert 5 times that both the local and remote pod can connect to the Service while all endpoints are terminating + serviceAddress := net.JoinHostPort(svc.Spec.ClusterIP, strconv.Itoa(servicePort)) + for i := 0; i < 5; i++ { + // There's a Service with internalTrafficPolicy=Cluster, + // with a single endpoint (which is terminating) called webserver0 running on node0. + // pausePod0 and pausePod1 are on node0 and node1 respectively. + // pausePod0 -> Service clusterIP succeeds because traffic policy is "Cluster" + // pausePod1 -> Service clusterIP succeeds because traffic policy is "Cluster" + execHostnameTest(*pausePod0, serviceAddress, webserverPod0.Name) + execHostnameTest(*pausePod1, serviceAddress, webserverPod0.Name) + + time.Sleep(5 * time.Second) + } + }) + + ginkgo.It("should fallback to local terminating endpoints when there are no ready endpoints with internalTrafficPolicy=Local [Feature:ProxyTerminatingEndpoints]", func() { + // windows kube-proxy does not support this feature yet + e2eskipper.SkipIfNodeOSDistroIs("windows") + + // This behavior is not supported if Kube-proxy is in "userspace" mode. + // So we check the kube-proxy mode and skip this test if that's the case. + if proxyMode, err := proxyMode(f); err == nil { + if proxyMode == "userspace" { + e2eskipper.Skipf("The test doesn't work with kube-proxy in userspace mode") + } + } else { + framework.Logf("Couldn't detect KubeProxy mode - test failure may be expected: %v", err) + } + + nodes, err := e2enode.GetBoundedReadySchedulableNodes(cs, 2) + framework.ExpectNoError(err) + nodeCounts := len(nodes.Items) + if nodeCounts < 2 { + e2eskipper.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts) + } + node0 := nodes.Items[0] + node1 := nodes.Items[1] + + serviceName := "svc-proxy-terminating" + ns := f.Namespace.Name + servicePort := 80 + + ginkgo.By("creating a TCP service " + serviceName + " where all pods are terminating" + ns) + jig := e2eservice.NewTestJig(cs, ns, serviceName) + local := v1.ServiceInternalTrafficPolicyLocal + svc, err := jig.CreateTCPService(func(svc *v1.Service) { + svc.Spec.Ports = []v1.ServicePort{ + {Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt(80)}, + } + svc.Spec.InternalTrafficPolicy = &local + }) + framework.ExpectNoError(err) + + ginkgo.By("Creating 1 webserver pod to be part of the TCP service") + webserverPod0 := e2epod.NewAgnhostPod(ns, "echo-hostname-0", nil, nil, nil, "netexec", "--http-port", strconv.Itoa(servicePort), "--delay-shutdown", "600") + webserverPod0.Labels = jig.Labels + webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(600) + e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name}) + + _, err = cs.CoreV1().Pods(ns).Create(context.TODO(), webserverPod0, metav1.CreateOptions{}) + framework.ExpectNoError(err) + framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, webserverPod0.Name, f.Namespace.Name, framework.PodStartTimeout)) + validateEndpointsPortsOrFail(cs, ns, serviceName, portsByPodName{webserverPod0.Name: {servicePort}}) + + ginkgo.By("Creating 2 pause pods that will try to connect to the webservers") + pausePod0 := e2epod.NewAgnhostPod(ns, "pause-pod-0", nil, nil, nil) + e2epod.SetNodeSelection(&pausePod0.Spec, e2epod.NodeSelection{Name: node0.Name}) + + pausePod0, err = cs.CoreV1().Pods(ns).Create(context.TODO(), pausePod0, metav1.CreateOptions{}) + framework.ExpectNoError(err) + framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, pausePod0.Name, f.Namespace.Name, framework.PodStartTimeout)) + + pausePod1 := e2epod.NewAgnhostPod(ns, "pause-pod-1", nil, nil, nil) + e2epod.SetNodeSelection(&pausePod1.Spec, e2epod.NodeSelection{Name: node1.Name}) + + pausePod1, err = cs.CoreV1().Pods(ns).Create(context.TODO(), pausePod1, metav1.CreateOptions{}) + framework.ExpectNoError(err) + framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, pausePod1.Name, f.Namespace.Name, framework.PodStartTimeout)) + + // webserver should continue to serve traffic through the Service after delete since: + // - it has a 600s termination grace period + // - it is the only ready endpoint + err = cs.CoreV1().Pods(ns).Delete(context.TODO(), webserverPod0.Name, metav1.DeleteOptions{}) + framework.ExpectNoError(err) + + // assert 5 times that the first pause pod can connect to the Service locally and the second one errors with a timeout + serviceAddress := net.JoinHostPort(svc.Spec.ClusterIP, strconv.Itoa(servicePort)) + for i := 0; i < 5; i++ { + // There's a Service with internalTrafficPolicy=Local, + // with a single endpoint (which is terminating) called webserver0 running on node0. + // pausePod0 and pausePod1 are on node0 and node1 respectively. + // pausePod0 -> Service clusterIP succeeds because webserver0 is running on node0 and traffic policy is "Local" + // pausePod1 -> Service clusterIP fails because webserver0 is on a different node and traffic policy is "Local" + execHostnameTest(*pausePod0, serviceAddress, webserverPod0.Name) + + cmd := fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, serviceAddress) + _, err := framework.RunHostCmd(pausePod1.Namespace, pausePod1.Name, cmd) + framework.ExpectError(err, "expected error when trying to connect to cluster IP") + + time.Sleep(5 * time.Second) + } + }) + + ginkgo.It("should fallback to terminating endpoints when there are no ready endpoints with externallTrafficPolicy=Cluster [Feature:ProxyTerminatingEndpoints]", func() { + // windows kube-proxy does not support this feature yet + e2eskipper.SkipIfNodeOSDistroIs("windows") + + // This behavior is not supported if Kube-proxy is in "userspace" mode. + // So we check the kube-proxy mode and skip this test if that's the case. + if proxyMode, err := proxyMode(f); err == nil { + if proxyMode == "userspace" { + e2eskipper.Skipf("The test doesn't work with kube-proxy in userspace mode") + } + } else { + framework.Logf("Couldn't detect KubeProxy mode - test failure may be expected: %v", err) + } + + nodes, err := e2enode.GetBoundedReadySchedulableNodes(cs, 2) + framework.ExpectNoError(err) + nodeCounts := len(nodes.Items) + if nodeCounts < 2 { + e2eskipper.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts) + } + node0 := nodes.Items[0] + node1 := nodes.Items[1] + + serviceName := "svc-proxy-terminating" + ns := f.Namespace.Name + servicePort := 80 + + ginkgo.By("creating a TCP service " + serviceName + " where all pods are terminating" + ns) + jig := e2eservice.NewTestJig(cs, ns, serviceName) + svc, err := jig.CreateTCPService(func(svc *v1.Service) { + svc.Spec.Ports = []v1.ServicePort{ + {Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt(80)}, + } + svc.Spec.Type = v1.ServiceTypeNodePort + }) + framework.ExpectNoError(err) + + ginkgo.By("Creating 1 webserver pod to be part of the TCP service") + webserverPod0 := e2epod.NewAgnhostPod(ns, "echo-hostname-0", nil, nil, nil, "netexec", "--http-port", strconv.Itoa(servicePort), "--delay-shutdown", "600") + webserverPod0.Labels = jig.Labels + webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(600) + e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name}) + + _, err = cs.CoreV1().Pods(ns).Create(context.TODO(), webserverPod0, metav1.CreateOptions{}) + framework.ExpectNoError(err) + framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, webserverPod0.Name, f.Namespace.Name, framework.PodStartTimeout)) + validateEndpointsPortsOrFail(cs, ns, serviceName, portsByPodName{webserverPod0.Name: {servicePort}}) + + ginkgo.By("Creating 2 pause pods that will try to connect to the webservers") + pausePod0 := e2epod.NewAgnhostPod(ns, "pause-pod-0", nil, nil, nil) + e2epod.SetNodeSelection(&pausePod0.Spec, e2epod.NodeSelection{Name: node0.Name}) + + pausePod0, err = cs.CoreV1().Pods(ns).Create(context.TODO(), pausePod0, metav1.CreateOptions{}) + framework.ExpectNoError(err) + framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, pausePod0.Name, f.Namespace.Name, framework.PodStartTimeout)) + + pausePod1 := e2epod.NewAgnhostPod(ns, "pause-pod-1", nil, nil, nil) + e2epod.SetNodeSelection(&pausePod1.Spec, e2epod.NodeSelection{Name: node1.Name}) + + pausePod1, err = cs.CoreV1().Pods(ns).Create(context.TODO(), pausePod1, metav1.CreateOptions{}) + framework.ExpectNoError(err) + framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, pausePod1.Name, f.Namespace.Name, framework.PodStartTimeout)) + + // webserver should continue to serve traffic through the Service after delete since: + // - it has a 600s termination grace period + // - it is the only ready endpoint + err = cs.CoreV1().Pods(ns).Delete(context.TODO(), webserverPod0.Name, metav1.DeleteOptions{}) + framework.ExpectNoError(err) + + // assert 5 times that both the local and remote pod can connect to the Service NodePort while all endpoints are terminating + nodeIPs := e2enode.GetAddresses(&node0, v1.NodeInternalIP) + nodePortAddress := net.JoinHostPort(nodeIPs[0], strconv.Itoa(int(svc.Spec.Ports[0].NodePort))) + for i := 0; i < 5; i++ { + // There's a Service Type=NodePort with externalTrafficPolicy=Cluster, + // with a single endpoint (which is terminating) called webserver0 running on node0. + // pausePod0 and pausePod1 are on node0 and node1 respectively. + // pausePod0 -> node0 node port succeeds because webserver0 is running on node0 and traffic policy is "Cluster" + // pausePod1 -> node0 node port succeeds because webserver0 is running on node0 and traffic policy is "Cluster" + execHostnameTest(*pausePod0, nodePortAddress, webserverPod0.Name) + execHostnameTest(*pausePod1, nodePortAddress, webserverPod0.Name) + + time.Sleep(5 * time.Second) + } + }) + + ginkgo.It("should fallback to local terminating endpoints when there are no ready endpoints with externalTrafficPolicy=Local [Feature:ProxyTerminatingEndpoints]", func() { + // windows kube-proxy does not support this feature yet + e2eskipper.SkipIfNodeOSDistroIs("windows") + + // This behavior is not supported if Kube-proxy is in "userspace" mode. + // So we check the kube-proxy mode and skip this test if that's the case. + if proxyMode, err := proxyMode(f); err == nil { + if proxyMode == "userspace" { + e2eskipper.Skipf("The test doesn't work with kube-proxy in userspace mode") + } + } else { + framework.Logf("Couldn't detect KubeProxy mode - test failure may be expected: %v", err) + } + + nodes, err := e2enode.GetBoundedReadySchedulableNodes(cs, 2) + framework.ExpectNoError(err) + nodeCounts := len(nodes.Items) + if nodeCounts < 2 { + e2eskipper.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts) + } + node0 := nodes.Items[0] + node1 := nodes.Items[1] + + serviceName := "svc-proxy-terminating" + ns := f.Namespace.Name + servicePort := 80 + + ginkgo.By("creating a TCP service " + serviceName + " where all pods are terminating" + ns) + jig := e2eservice.NewTestJig(cs, ns, serviceName) + svc, err := jig.CreateTCPService(func(svc *v1.Service) { + svc.Spec.Ports = []v1.ServicePort{ + {Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt(80)}, + } + svc.Spec.Type = v1.ServiceTypeNodePort + svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal + }) + framework.ExpectNoError(err) + + ginkgo.By("Creating 1 webserver pod to be part of the TCP service") + webserverPod0 := e2epod.NewAgnhostPod(ns, "echo-hostname-0", nil, nil, nil, "netexec", "--http-port", strconv.Itoa(servicePort), "--delay-shutdown", "600") + webserverPod0.Labels = jig.Labels + webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(600) + e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name}) + + _, err = cs.CoreV1().Pods(ns).Create(context.TODO(), webserverPod0, metav1.CreateOptions{}) + framework.ExpectNoError(err) + framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, webserverPod0.Name, f.Namespace.Name, framework.PodStartTimeout)) + validateEndpointsPortsOrFail(cs, ns, serviceName, portsByPodName{webserverPod0.Name: {servicePort}}) + + ginkgo.By("Creating 2 pause pods that will try to connect to the webservers") + pausePod0 := e2epod.NewAgnhostPod(ns, "pause-pod-0", nil, nil, nil) + e2epod.SetNodeSelection(&pausePod0.Spec, e2epod.NodeSelection{Name: node0.Name}) + + pausePod0, err = cs.CoreV1().Pods(ns).Create(context.TODO(), pausePod0, metav1.CreateOptions{}) + framework.ExpectNoError(err) + framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, pausePod0.Name, f.Namespace.Name, framework.PodStartTimeout)) + + pausePod1 := e2epod.NewAgnhostPod(ns, "pause-pod-1", nil, nil, nil) + e2epod.SetNodeSelection(&pausePod1.Spec, e2epod.NodeSelection{Name: node1.Name}) + + pausePod1, err = cs.CoreV1().Pods(ns).Create(context.TODO(), pausePod1, metav1.CreateOptions{}) + framework.ExpectNoError(err) + framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, pausePod1.Name, f.Namespace.Name, framework.PodStartTimeout)) + + // webserver should continue to serve traffic through the Service after delete since: + // - it has a 600s termination grace period + // - it is the only ready endpoint + err = cs.CoreV1().Pods(ns).Delete(context.TODO(), webserverPod0.Name, metav1.DeleteOptions{}) + framework.ExpectNoError(err) + + // assert 5 times that the first pause pod can connect to the Service locally and the second one errors with a timeout + nodeIPs0 := e2enode.GetAddresses(&node0, v1.NodeInternalIP) + nodeIPs1 := e2enode.GetAddresses(&node1, v1.NodeInternalIP) + nodePortAddress0 := net.JoinHostPort(nodeIPs0[0], strconv.Itoa(int(svc.Spec.Ports[0].NodePort))) + nodePortAddress1 := net.JoinHostPort(nodeIPs1[0], strconv.Itoa(int(svc.Spec.Ports[0].NodePort))) + for i := 0; i < 5; i++ { + // There's a Service Type=NodePort with externalTrafficPolicy=Local, + // with a single endpoint (which is terminating) called webserver0 running on node0. + // pausePod0 and pausePod1 are on node0 and node1 respectively. + // pausePod0 -> node1 node port fails because it's "external" and there are no local endpoints + // pausePod1 -> node0 node port succeeds because webserver0 is running on node0 + // pausePod0 -> node0 and pausePod1 -> node1 both succeed because pod-to-same-node-NodePort + // connections are neither internal nor external and always get Cluster traffic policy. + cmd := fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, nodePortAddress1) + _, err := framework.RunHostCmd(pausePod0.Namespace, pausePod0.Name, cmd) + framework.ExpectError(err, "expected error when trying to connect to node port for pausePod0") + + execHostnameTest(*pausePod0, nodePortAddress0, webserverPod0.Name) + execHostnameTest(*pausePod1, nodePortAddress0, webserverPod0.Name) + execHostnameTest(*pausePod1, nodePortAddress1, webserverPod0.Name) + + time.Sleep(5 * time.Second) + } + }) + /* Release: v1.18 Testname: Find Kubernetes Service in default Namespace diff --git a/test/utils/image/manifest.go b/test/utils/image/manifest.go index 113b9b99644..bc8f4280951 100644 --- a/test/utils/image/manifest.go +++ b/test/utils/image/manifest.go @@ -231,7 +231,7 @@ const ( func initImageConfigs(list RegistryList) (map[int]Config, map[int]Config) { configs := map[int]Config{} - configs[Agnhost] = Config{list.PromoterE2eRegistry, "agnhost", "2.33"} + configs[Agnhost] = Config{list.PromoterE2eRegistry, "agnhost", "2.36"} configs[AgnhostPrivate] = Config{list.PrivateRegistry, "agnhost", "2.6"} configs[AuthenticatedAlpine] = Config{list.GcAuthenticatedRegistry, "alpine", "3.7"} configs[AuthenticatedWindowsNanoServer] = Config{list.GcAuthenticatedRegistry, "windows-nanoserver", "v1"}