Merge pull request #97824 from hanlins/fix/97225/hc-rules

Explicitly add iptables rule to allow healthcheck nodeport
This commit is contained in:
Kubernetes Prow Robot
2021-02-04 15:54:52 -08:00
committed by GitHub
6 changed files with 336 additions and 3 deletions

View File

@@ -395,6 +395,7 @@ type iptablesJumpChain struct {
var iptablesJumpChains = []iptablesJumpChain{
{utiliptables.TableFilter, kubeExternalServicesChain, utiliptables.ChainInput, "kubernetes externally-visible service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}},
{utiliptables.TableFilter, kubeExternalServicesChain, utiliptables.ChainForward, "kubernetes externally-visible service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}},
{utiliptables.TableFilter, kubeNodePortsChain, utiliptables.ChainInput, "kubernetes health check service ports", nil},
{utiliptables.TableFilter, kubeServicesChain, utiliptables.ChainForward, "kubernetes service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}},
{utiliptables.TableFilter, kubeServicesChain, utiliptables.ChainOutput, "kubernetes service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}},
{utiliptables.TableFilter, kubeForwardChain, utiliptables.ChainForward, "kubernetes forwarding rules", nil},
@@ -479,7 +480,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
filterChains := bytes.NewBuffer(nil)
filterRules := bytes.NewBuffer(nil)
utilproxy.WriteLine(filterChains, "*filter")
for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain} {
for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain, kubeNodePortsChain} {
if _, found := existingFilterChains[chain]; found {
chainString := string(chain)
utilproxy.WriteBytesLine(filterChains, existingFilterChains[chain])
@@ -918,7 +919,7 @@ func (proxier *Proxier) syncProxyRules() {
// Make sure we keep stats for the top-level chains, if they existed
// (which most should have because we created them above).
for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain} {
for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain, kubeNodePortsChain} {
if chain, ok := existingFilterChains[chainName]; ok {
utilproxy.WriteBytesLine(proxier.filterChains, chain)
} else {
@@ -1328,6 +1329,19 @@ func (proxier *Proxier) syncProxyRules() {
}
}
// Capture healthCheckNodePorts.
if svcInfo.HealthCheckNodePort() != 0 {
// no matter if node has local endpoints, healthCheckNodePorts
// need to add a rule to accept the incoming connection
utilproxy.WriteLine(proxier.filterRules,
"-A", string(kubeNodePortsChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s health check node port"`, svcNameString),
"-m", "tcp", "-p", "tcp",
"--dport", strconv.Itoa(svcInfo.HealthCheckNodePort()),
"-j", "ACCEPT",
)
}
if !hasEndpoints {
continue
}

View File

@@ -454,6 +454,14 @@ func TestDeleteEndpointConnectionsIPv6(t *testing.T) {
}
}
// fakeCloseable implements utilproxy.Closeable
type fakeCloseable struct{}
// Close fakes out the close() used by syncProxyRules to release a local port.
func (f *fakeCloseable) Close() error {
return nil
}
// fakePortOpener implements portOpener.
type fakePortOpener struct {
openPorts []*utilproxy.LocalPort
@@ -463,7 +471,7 @@ type fakePortOpener struct {
// to lock a local port.
func (f *fakePortOpener) OpenLocalPort(lp *utilproxy.LocalPort, isIPv6 bool) (utilproxy.Closeable, error) {
f.openPorts = append(f.openPorts, lp)
return nil, nil
return &fakeCloseable{}, nil
}
const testHostname = "test-hostname"
@@ -913,6 +921,50 @@ func TestNodePort(t *testing.T) {
}
}
func TestHealthCheckNodePort(t *testing.T) {
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt, false)
svcIP := "10.20.30.42"
svcPort := 80
svcNodePort := 3001
svcHealthCheckNodePort := 30000
svcPortName := proxy.ServicePortName{
NamespacedName: makeNSN("ns1", "svc1"),
Port: "p80",
Protocol: v1.ProtocolTCP,
}
makeServiceMap(fp,
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
svc.Spec.Type = "LoadBalancer"
svc.Spec.ClusterIP = svcIP
svc.Spec.Ports = []v1.ServicePort{{
Name: svcPortName.Port,
Port: int32(svcPort),
Protocol: v1.ProtocolTCP,
NodePort: int32(svcNodePort),
}}
svc.Spec.HealthCheckNodePort = int32(svcHealthCheckNodePort)
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
}),
)
itf := net.Interface{Index: 0, MTU: 0, Name: "lo", HardwareAddr: nil, Flags: 0}
addrs := []net.Addr{utilproxytest.AddrStruct{Val: "127.0.0.1/16"}}
itf1 := net.Interface{Index: 1, MTU: 0, Name: "eth1", HardwareAddr: nil, Flags: 0}
addrs1 := []net.Addr{utilproxytest.AddrStruct{Val: "::1/128"}}
fp.networkInterfacer.(*utilproxytest.FakeNetwork).AddInterfaceAddr(&itf, addrs)
fp.networkInterfacer.(*utilproxytest.FakeNetwork).AddInterfaceAddr(&itf1, addrs1)
fp.nodePortAddresses = []string{"127.0.0.1/16"}
fp.syncProxyRules()
kubeNodePortsRules := ipt.GetRules(string(kubeNodePortsChain))
if !hasJump(kubeNodePortsRules, iptablestest.Accept, "", svcHealthCheckNodePort) {
errorf(fmt.Sprintf("Failed to find Accept rule"), kubeNodePortsRules, t)
}
}
func TestMasqueradeRule(t *testing.T) {
for _, testcase := range []bool{false, true} {
ipt := iptablestest.NewFake().SetHasRandomFully(testcase)
@@ -2594,6 +2646,7 @@ func TestEndpointSliceE2E(t *testing.T) {
:KUBE-SERVICES - [0:0]
:KUBE-EXTERNAL-SERVICES - [0:0]
:KUBE-FORWARD - [0:0]
:KUBE-NODEPORTS - [0:0]
-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod source rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
@@ -2686,4 +2739,113 @@ COMMIT
assert.NotEqual(t, expectedIPTablesWithSlice, fp.iptablesData.String())
}
func TestHealthCheckNodePortE2E(t *testing.T) {
expectedIPTables := `*filter
:KUBE-SERVICES - [0:0]
:KUBE-EXTERNAL-SERVICES - [0:0]
:KUBE-FORWARD - [0:0]
:KUBE-NODEPORTS - [0:0]
-A KUBE-NODEPORTS -m comment --comment "ns1/svc1 health check node port" -m tcp -p tcp --dport 30000 -j ACCEPT
-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod source rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod destination rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
COMMIT
*nat
:KUBE-SERVICES - [0:0]
:KUBE-NODEPORTS - [0:0]
:KUBE-POSTROUTING - [0:0]
:KUBE-MARK-MASQ - [0:0]
:KUBE-SVC-AQI2S6QIMU7PVVRP - [0:0]
:KUBE-XLB-AQI2S6QIMU7PVVRP - [0:0]
:KUBE-SEP-3JOIVZTXZZRGORX4 - [0:0]
:KUBE-SEP-IO5XOSKPAXIFQXAJ - [0:0]
:KUBE-SEP-XGJFVO3L2O5SRFNT - [0:0]
-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN
-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
-A KUBE-MARK-MASQ -j MARK --or-mark 0x4000
-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 0 ! -s 10.0.0.0/24 -j KUBE-MARK-MASQ
-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 0 -j KUBE-SVC-AQI2S6QIMU7PVVRP
-A KUBE-NODEPORTS -m comment --comment ns1/svc1 -m tcp -p tcp --dport 30010 -s 127.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-NODEPORTS -m comment --comment ns1/svc1 -m tcp -p tcp --dport 30010 -j KUBE-XLB-AQI2S6QIMU7PVVRP
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.3333333333 -j KUBE-SEP-3JOIVZTXZZRGORX4
-A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -s 10.0.1.1/32 -j KUBE-MARK-MASQ
-A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-IO5XOSKPAXIFQXAJ
-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -s 10.0.1.2/32 -j KUBE-MARK-MASQ
-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.2:80
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-XGJFVO3L2O5SRFNT
-A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -s 10.0.1.3/32 -j KUBE-MARK-MASQ
-A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.3:80
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Redirect pods trying to reach external loadbalancer VIP to clusterIP" -s 10.0.0.0/24 -j KUBE-SVC-AQI2S6QIMU7PVVRP
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "masquerade LOCAL traffic for ns1/svc1 LB IP" -m addrtype --src-type LOCAL -j KUBE-MARK-MASQ
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "route LOCAL traffic for ns1/svc1 LB IP to service chain" -m addrtype --src-type LOCAL -j KUBE-SVC-AQI2S6QIMU7PVVRP
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Balancing rule 0 for ns1/svc1" -j KUBE-SEP-3JOIVZTXZZRGORX4
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
COMMIT
`
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt, true)
fp.OnServiceSynced()
fp.OnEndpointsSynced()
fp.OnEndpointSlicesSynced()
serviceName := "svc1"
namespaceName := "ns1"
svc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName},
Spec: v1.ServiceSpec{
ClusterIP: "172.20.1.1",
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Name: "", TargetPort: intstr.FromInt(80), NodePort: 30010, Protocol: v1.ProtocolTCP}},
Type: "LoadBalancer",
HealthCheckNodePort: 30000,
ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeLocal,
},
}
fp.OnServiceAdd(svc)
tcpProtocol := v1.ProtocolTCP
endpointSlice := &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-1", serviceName),
Namespace: namespaceName,
Labels: map[string]string{discovery.LabelServiceName: serviceName},
},
Ports: []discovery.EndpointPort{{
Name: utilpointer.StringPtr(""),
Port: utilpointer.Int32Ptr(80),
Protocol: &tcpProtocol,
}},
AddressType: discovery.AddressTypeIPv4,
Endpoints: []discovery.Endpoint{{
Addresses: []string{"10.0.1.1"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
}, {
Addresses: []string{"10.0.1.2"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Topology: map[string]string{"kubernetes.io/hostname": "node2"},
}, {
Addresses: []string{"10.0.1.3"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Topology: map[string]string{"kubernetes.io/hostname": "node3"},
}, { // not ready endpoints should be ignored
Addresses: []string{"10.0.1.4"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(false)},
Topology: map[string]string{"kubernetes.io/hostname": "node4"},
}},
}
fp.OnEndpointSliceAdd(endpointSlice)
fp.syncProxyRules()
assert.Equal(t, expectedIPTables, fp.iptablesData.String())
fp.OnServiceDelete(svc)
fp.syncProxyRules()
assert.NotEqual(t, expectedIPTables, fp.iptablesData.String())
}
// TODO(thockin): add *more* tests for syncProxyRules() or break it down further and test the pieces.