mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
Add rule to allow healthcheck nodeport traffic in filter table
1. For iptables mode, add KUBE-NODEPORTS chain in filter table. Add rules to allow healthcheck node port traffic. 2. For ipvs mode, add KUBE-NODE-PORT chain in filter table. Add KUBE-HEALTH-CHECK-NODE-PORT ipset to allow traffic to healthcheck node port.
This commit is contained in:
parent
228d5f2002
commit
4cd1eacbc1
@ -395,6 +395,7 @@ type iptablesJumpChain struct {
|
||||
var iptablesJumpChains = []iptablesJumpChain{
|
||||
{utiliptables.TableFilter, kubeExternalServicesChain, utiliptables.ChainInput, "kubernetes externally-visible service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}},
|
||||
{utiliptables.TableFilter, kubeExternalServicesChain, utiliptables.ChainForward, "kubernetes externally-visible service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}},
|
||||
{utiliptables.TableFilter, kubeNodePortsChain, utiliptables.ChainInput, "kubernetes health check service ports", nil},
|
||||
{utiliptables.TableFilter, kubeServicesChain, utiliptables.ChainForward, "kubernetes service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}},
|
||||
{utiliptables.TableFilter, kubeServicesChain, utiliptables.ChainOutput, "kubernetes service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}},
|
||||
{utiliptables.TableFilter, kubeForwardChain, utiliptables.ChainForward, "kubernetes forwarding rules", nil},
|
||||
@ -479,7 +480,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
|
||||
filterChains := bytes.NewBuffer(nil)
|
||||
filterRules := bytes.NewBuffer(nil)
|
||||
utilproxy.WriteLine(filterChains, "*filter")
|
||||
for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain} {
|
||||
for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain, kubeNodePortsChain} {
|
||||
if _, found := existingFilterChains[chain]; found {
|
||||
chainString := string(chain)
|
||||
utilproxy.WriteBytesLine(filterChains, existingFilterChains[chain])
|
||||
@ -933,7 +934,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
|
||||
// Make sure we keep stats for the top-level chains, if they existed
|
||||
// (which most should have because we created them above).
|
||||
for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain} {
|
||||
for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain, kubeNodePortsChain} {
|
||||
if chain, ok := existingFilterChains[chainName]; ok {
|
||||
utilproxy.WriteBytesLine(proxier.filterChains, chain)
|
||||
} else {
|
||||
@ -1337,6 +1338,19 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
}
|
||||
}
|
||||
|
||||
// Capture healthCheckNodePorts.
|
||||
if svcInfo.HealthCheckNodePort() != 0 {
|
||||
// no matter if node has local endpoints, healthCheckNodePorts
|
||||
// need to add a rule to accept the incoming connection
|
||||
utilproxy.WriteLine(proxier.filterRules,
|
||||
"-A", string(kubeNodePortsChain),
|
||||
"-m", "comment", "--comment", fmt.Sprintf(`"%s health check node port"`, svcNameString),
|
||||
"-m", "tcp", "-p", "tcp",
|
||||
"--dport", strconv.Itoa(svcInfo.HealthCheckNodePort()),
|
||||
"-j", "ACCEPT",
|
||||
)
|
||||
}
|
||||
|
||||
if !hasEndpoints {
|
||||
continue
|
||||
}
|
||||
|
@ -454,6 +454,14 @@ func TestDeleteEndpointConnectionsIPv6(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// fakeCloseable implements utilproxy.Closeable
|
||||
type fakeCloseable struct{}
|
||||
|
||||
// Close fakes out the close() used by syncProxyRules to release a local port.
|
||||
func (f *fakeCloseable) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// fakePortOpener implements portOpener.
|
||||
type fakePortOpener struct {
|
||||
openPorts []*utilproxy.LocalPort
|
||||
@ -463,7 +471,7 @@ type fakePortOpener struct {
|
||||
// to lock a local port.
|
||||
func (f *fakePortOpener) OpenLocalPort(lp *utilproxy.LocalPort, isIPv6 bool) (utilproxy.Closeable, error) {
|
||||
f.openPorts = append(f.openPorts, lp)
|
||||
return nil, nil
|
||||
return &fakeCloseable{}, nil
|
||||
}
|
||||
|
||||
const testHostname = "test-hostname"
|
||||
@ -913,6 +921,50 @@ func TestNodePort(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestHealthCheckNodePort(t *testing.T) {
|
||||
ipt := iptablestest.NewFake()
|
||||
fp := NewFakeProxier(ipt, false)
|
||||
svcIP := "10.20.30.42"
|
||||
svcPort := 80
|
||||
svcNodePort := 3001
|
||||
svcHealthCheckNodePort := 30000
|
||||
svcPortName := proxy.ServicePortName{
|
||||
NamespacedName: makeNSN("ns1", "svc1"),
|
||||
Port: "p80",
|
||||
Protocol: v1.ProtocolTCP,
|
||||
}
|
||||
|
||||
makeServiceMap(fp,
|
||||
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
|
||||
svc.Spec.Type = "LoadBalancer"
|
||||
svc.Spec.ClusterIP = svcIP
|
||||
svc.Spec.Ports = []v1.ServicePort{{
|
||||
Name: svcPortName.Port,
|
||||
Port: int32(svcPort),
|
||||
Protocol: v1.ProtocolTCP,
|
||||
NodePort: int32(svcNodePort),
|
||||
}}
|
||||
svc.Spec.HealthCheckNodePort = int32(svcHealthCheckNodePort)
|
||||
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
|
||||
}),
|
||||
)
|
||||
|
||||
itf := net.Interface{Index: 0, MTU: 0, Name: "lo", HardwareAddr: nil, Flags: 0}
|
||||
addrs := []net.Addr{utilproxytest.AddrStruct{Val: "127.0.0.1/16"}}
|
||||
itf1 := net.Interface{Index: 1, MTU: 0, Name: "eth1", HardwareAddr: nil, Flags: 0}
|
||||
addrs1 := []net.Addr{utilproxytest.AddrStruct{Val: "::1/128"}}
|
||||
fp.networkInterfacer.(*utilproxytest.FakeNetwork).AddInterfaceAddr(&itf, addrs)
|
||||
fp.networkInterfacer.(*utilproxytest.FakeNetwork).AddInterfaceAddr(&itf1, addrs1)
|
||||
fp.nodePortAddresses = []string{"127.0.0.1/16"}
|
||||
|
||||
fp.syncProxyRules()
|
||||
|
||||
kubeNodePortsRules := ipt.GetRules(string(kubeNodePortsChain))
|
||||
if !hasJump(kubeNodePortsRules, iptablestest.Accept, "", svcHealthCheckNodePort) {
|
||||
errorf(fmt.Sprintf("Failed to find Accept rule"), kubeNodePortsRules, t)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMasqueradeRule(t *testing.T) {
|
||||
for _, testcase := range []bool{false, true} {
|
||||
ipt := iptablestest.NewFake().SetHasRandomFully(testcase)
|
||||
@ -2594,6 +2646,7 @@ func TestEndpointSliceE2E(t *testing.T) {
|
||||
:KUBE-SERVICES - [0:0]
|
||||
:KUBE-EXTERNAL-SERVICES - [0:0]
|
||||
:KUBE-FORWARD - [0:0]
|
||||
:KUBE-NODEPORTS - [0:0]
|
||||
-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP
|
||||
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
|
||||
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod source rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
|
||||
@ -2686,4 +2739,113 @@ COMMIT
|
||||
assert.NotEqual(t, expectedIPTablesWithSlice, fp.iptablesData.String())
|
||||
}
|
||||
|
||||
func TestHealthCheckNodePortE2E(t *testing.T) {
|
||||
expectedIPTables := `*filter
|
||||
:KUBE-SERVICES - [0:0]
|
||||
:KUBE-EXTERNAL-SERVICES - [0:0]
|
||||
:KUBE-FORWARD - [0:0]
|
||||
:KUBE-NODEPORTS - [0:0]
|
||||
-A KUBE-NODEPORTS -m comment --comment "ns1/svc1 health check node port" -m tcp -p tcp --dport 30000 -j ACCEPT
|
||||
-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP
|
||||
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
|
||||
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod source rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
|
||||
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod destination rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
|
||||
COMMIT
|
||||
*nat
|
||||
:KUBE-SERVICES - [0:0]
|
||||
:KUBE-NODEPORTS - [0:0]
|
||||
:KUBE-POSTROUTING - [0:0]
|
||||
:KUBE-MARK-MASQ - [0:0]
|
||||
:KUBE-SVC-AQI2S6QIMU7PVVRP - [0:0]
|
||||
:KUBE-XLB-AQI2S6QIMU7PVVRP - [0:0]
|
||||
:KUBE-SEP-3JOIVZTXZZRGORX4 - [0:0]
|
||||
:KUBE-SEP-IO5XOSKPAXIFQXAJ - [0:0]
|
||||
:KUBE-SEP-XGJFVO3L2O5SRFNT - [0:0]
|
||||
-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN
|
||||
-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000
|
||||
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
|
||||
-A KUBE-MARK-MASQ -j MARK --or-mark 0x4000
|
||||
-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 0 ! -s 10.0.0.0/24 -j KUBE-MARK-MASQ
|
||||
-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 0 -j KUBE-SVC-AQI2S6QIMU7PVVRP
|
||||
-A KUBE-NODEPORTS -m comment --comment ns1/svc1 -m tcp -p tcp --dport 30010 -s 127.0.0.0/8 -j KUBE-MARK-MASQ
|
||||
-A KUBE-NODEPORTS -m comment --comment ns1/svc1 -m tcp -p tcp --dport 30010 -j KUBE-XLB-AQI2S6QIMU7PVVRP
|
||||
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.3333333333 -j KUBE-SEP-3JOIVZTXZZRGORX4
|
||||
-A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -s 10.0.1.1/32 -j KUBE-MARK-MASQ
|
||||
-A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80
|
||||
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-IO5XOSKPAXIFQXAJ
|
||||
-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -s 10.0.1.2/32 -j KUBE-MARK-MASQ
|
||||
-A KUBE-SEP-IO5XOSKPAXIFQXAJ -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.2:80
|
||||
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-XGJFVO3L2O5SRFNT
|
||||
-A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -s 10.0.1.3/32 -j KUBE-MARK-MASQ
|
||||
-A KUBE-SEP-XGJFVO3L2O5SRFNT -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.3:80
|
||||
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Redirect pods trying to reach external loadbalancer VIP to clusterIP" -s 10.0.0.0/24 -j KUBE-SVC-AQI2S6QIMU7PVVRP
|
||||
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "masquerade LOCAL traffic for ns1/svc1 LB IP" -m addrtype --src-type LOCAL -j KUBE-MARK-MASQ
|
||||
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "route LOCAL traffic for ns1/svc1 LB IP to service chain" -m addrtype --src-type LOCAL -j KUBE-SVC-AQI2S6QIMU7PVVRP
|
||||
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Balancing rule 0 for ns1/svc1" -j KUBE-SEP-3JOIVZTXZZRGORX4
|
||||
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
|
||||
COMMIT
|
||||
`
|
||||
|
||||
ipt := iptablestest.NewFake()
|
||||
fp := NewFakeProxier(ipt, true)
|
||||
fp.OnServiceSynced()
|
||||
fp.OnEndpointsSynced()
|
||||
fp.OnEndpointSlicesSynced()
|
||||
|
||||
serviceName := "svc1"
|
||||
namespaceName := "ns1"
|
||||
|
||||
svc := &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName},
|
||||
Spec: v1.ServiceSpec{
|
||||
ClusterIP: "172.20.1.1",
|
||||
Selector: map[string]string{"foo": "bar"},
|
||||
Ports: []v1.ServicePort{{Name: "", TargetPort: intstr.FromInt(80), NodePort: 30010, Protocol: v1.ProtocolTCP}},
|
||||
Type: "LoadBalancer",
|
||||
HealthCheckNodePort: 30000,
|
||||
ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeLocal,
|
||||
},
|
||||
}
|
||||
fp.OnServiceAdd(svc)
|
||||
|
||||
tcpProtocol := v1.ProtocolTCP
|
||||
endpointSlice := &discovery.EndpointSlice{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("%s-1", serviceName),
|
||||
Namespace: namespaceName,
|
||||
Labels: map[string]string{discovery.LabelServiceName: serviceName},
|
||||
},
|
||||
Ports: []discovery.EndpointPort{{
|
||||
Name: utilpointer.StringPtr(""),
|
||||
Port: utilpointer.Int32Ptr(80),
|
||||
Protocol: &tcpProtocol,
|
||||
}},
|
||||
AddressType: discovery.AddressTypeIPv4,
|
||||
Endpoints: []discovery.Endpoint{{
|
||||
Addresses: []string{"10.0.1.1"},
|
||||
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
|
||||
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
|
||||
}, {
|
||||
Addresses: []string{"10.0.1.2"},
|
||||
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
|
||||
Topology: map[string]string{"kubernetes.io/hostname": "node2"},
|
||||
}, {
|
||||
Addresses: []string{"10.0.1.3"},
|
||||
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
|
||||
Topology: map[string]string{"kubernetes.io/hostname": "node3"},
|
||||
}, { // not ready endpoints should be ignored
|
||||
Addresses: []string{"10.0.1.4"},
|
||||
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(false)},
|
||||
Topology: map[string]string{"kubernetes.io/hostname": "node4"},
|
||||
}},
|
||||
}
|
||||
fp.OnEndpointSliceAdd(endpointSlice)
|
||||
fp.syncProxyRules()
|
||||
assert.Equal(t, expectedIPTables, fp.iptablesData.String())
|
||||
|
||||
fp.OnServiceDelete(svc)
|
||||
fp.syncProxyRules()
|
||||
assert.NotEqual(t, expectedIPTables, fp.iptablesData.String())
|
||||
}
|
||||
|
||||
// TODO(thockin): add *more* tests for syncProxyRules() or break it down further and test the pieces.
|
||||
|
@ -75,6 +75,9 @@ const (
|
||||
|
||||
kubeNodePortLocalSetSCTPComment = "Kubernetes nodeport SCTP port with externalTrafficPolicy=local with type 'hash ip:port'"
|
||||
kubeNodePortLocalSetSCTP = "KUBE-NODE-PORT-LOCAL-SCTP-HASH"
|
||||
|
||||
kubeHealthCheckNodePortSetComment = "Kubernetes health check node port"
|
||||
kubeHealthCheckNodePortSet = "KUBE-HEALTH-CHECK-NODE-PORT"
|
||||
)
|
||||
|
||||
// IPSetVersioner can query the current ipset version.
|
||||
|
@ -106,6 +106,7 @@ var iptablesJumpChain = []struct {
|
||||
{utiliptables.TableNAT, utiliptables.ChainPrerouting, kubeServicesChain, "kubernetes service portals"},
|
||||
{utiliptables.TableNAT, utiliptables.ChainPostrouting, kubePostroutingChain, "kubernetes postrouting rules"},
|
||||
{utiliptables.TableFilter, utiliptables.ChainForward, KubeForwardChain, "kubernetes forwarding rules"},
|
||||
{utiliptables.TableFilter, utiliptables.ChainInput, KubeNodePortChain, "kubernetes health check rules"},
|
||||
}
|
||||
|
||||
var iptablesChains = []struct {
|
||||
@ -119,6 +120,7 @@ var iptablesChains = []struct {
|
||||
{utiliptables.TableNAT, KubeLoadBalancerChain},
|
||||
{utiliptables.TableNAT, KubeMarkMasqChain},
|
||||
{utiliptables.TableFilter, KubeForwardChain},
|
||||
{utiliptables.TableFilter, KubeNodePortChain},
|
||||
}
|
||||
|
||||
var iptablesEnsureChains = []struct {
|
||||
@ -161,6 +163,7 @@ var ipsetInfo = []struct {
|
||||
{kubeNodePortLocalSetUDP, utilipset.BitmapPort, kubeNodePortLocalSetUDPComment},
|
||||
{kubeNodePortSetSCTP, utilipset.HashIPPort, kubeNodePortSetSCTPComment},
|
||||
{kubeNodePortLocalSetSCTP, utilipset.HashIPPort, kubeNodePortLocalSetSCTPComment},
|
||||
{kubeHealthCheckNodePortSet, utilipset.BitmapPort, kubeHealthCheckNodePortSetComment},
|
||||
}
|
||||
|
||||
// ipsetWithIptablesChain is the ipsets list with iptables source chain and the chain jump to
|
||||
@ -1581,6 +1584,22 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if svcInfo.HealthCheckNodePort() != 0 {
|
||||
nodePortSet := proxier.ipsetList[kubeHealthCheckNodePortSet]
|
||||
entry := &utilipset.Entry{
|
||||
// No need to provide ip info
|
||||
Port: svcInfo.HealthCheckNodePort(),
|
||||
Protocol: "tcp",
|
||||
SetType: utilipset.BitmapPort,
|
||||
}
|
||||
|
||||
if valid := nodePortSet.validateEntry(entry); !valid {
|
||||
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortSet.Name))
|
||||
continue
|
||||
}
|
||||
nodePortSet.activeEntries.Insert(entry.String())
|
||||
}
|
||||
}
|
||||
|
||||
// sync ipset entries
|
||||
@ -1817,6 +1836,14 @@ func (proxier *Proxier) writeIptablesRules() {
|
||||
"-j", "ACCEPT",
|
||||
)
|
||||
|
||||
// Add rule to accept traffic towards health check node port
|
||||
utilproxy.WriteLine(proxier.filterRules,
|
||||
"-A", string(KubeNodePortChain),
|
||||
"-m", "comment", "--comment", proxier.ipsetList[kubeHealthCheckNodePortSet].getComment(),
|
||||
"-m", "set", "--match-set", proxier.ipsetList[kubeHealthCheckNodePortSet].Name, "dst",
|
||||
"-j", "ACCEPT",
|
||||
)
|
||||
|
||||
// Install the kubernetes-specific postrouting rules. We use a whole chain for
|
||||
// this so that it is easier to flush and change, for example if the mark
|
||||
// value should ever change.
|
||||
|
@ -1920,6 +1920,75 @@ func TestOnlyLocalNodePorts(t *testing.T) {
|
||||
checkIptables(t, ipt, epIpt)
|
||||
}
|
||||
|
||||
func TestHealthCheckNodePort(t *testing.T) {
|
||||
ipt, fp := buildFakeProxier()
|
||||
|
||||
svcIP := "10.20.30.41"
|
||||
svcPort := 80
|
||||
svcNodePort := 3000
|
||||
svcPortName := proxy.ServicePortName{
|
||||
NamespacedName: makeNSN("ns1", "svc1"),
|
||||
Port: "p80",
|
||||
}
|
||||
|
||||
sampleSvc := makeTestService(svcPortName.Namespace, "", func(svc *v1.Service) {
|
||||
svc.Spec.Type = "LoadBalancer"
|
||||
svc.Spec.ClusterIP = svcIP
|
||||
svc.Spec.Ports = []v1.ServicePort{{
|
||||
Name: svcPortName.Port,
|
||||
Port: int32(svcPort),
|
||||
Protocol: v1.ProtocolTCP,
|
||||
NodePort: int32(svcNodePort),
|
||||
}}
|
||||
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
|
||||
})
|
||||
|
||||
svc1, svc2, invalidSvc3 := *sampleSvc, *sampleSvc, *sampleSvc
|
||||
svc1.Name, svc1.Spec.HealthCheckNodePort = "valid-svc1", 30000
|
||||
svc2.Name, svc2.Spec.HealthCheckNodePort = "valid-svc2", 30001
|
||||
// make svc3 invalid by setting external traffic policy to cluster
|
||||
invalidSvc3.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeCluster
|
||||
invalidSvc3.Name, invalidSvc3.Spec.HealthCheckNodePort = "invalid-svc3", 30002
|
||||
|
||||
makeServiceMap(fp,
|
||||
&svc1,
|
||||
&svc2,
|
||||
&invalidSvc3,
|
||||
)
|
||||
|
||||
itf := net.Interface{Index: 0, MTU: 0, Name: "eth0", HardwareAddr: nil, Flags: 0}
|
||||
addrs := []net.Addr{proxyutiltest.AddrStruct{Val: "100.101.102.103/24"}}
|
||||
itf1 := net.Interface{Index: 1, MTU: 0, Name: "eth1", HardwareAddr: nil, Flags: 0}
|
||||
addrs1 := []net.Addr{proxyutiltest.AddrStruct{Val: "2001:db8::0/64"}}
|
||||
fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf, addrs)
|
||||
fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf1, addrs1)
|
||||
fp.nodePortAddresses = []string{"100.101.102.0/24", "2001:db8::0/64"}
|
||||
|
||||
fp.syncProxyRules()
|
||||
|
||||
// check ipSet rules
|
||||
makeTCPEntry := func(port int) *utilipset.Entry {
|
||||
return &utilipset.Entry{
|
||||
Port: port,
|
||||
Protocol: strings.ToLower(string(v1.ProtocolTCP)),
|
||||
SetType: utilipset.BitmapPort,
|
||||
}
|
||||
}
|
||||
epIPSet := netlinktest.ExpectedIPSet{
|
||||
// healthcheck node port set should only contain valid HC node ports
|
||||
kubeHealthCheckNodePortSet: {makeTCPEntry(30000), makeTCPEntry(30001)},
|
||||
}
|
||||
checkIPSet(t, fp, epIPSet)
|
||||
|
||||
// Check iptables chain and rules
|
||||
epIpt := netlinktest.ExpectedIptablesChain{
|
||||
string(KubeNodePortChain): {{
|
||||
JumpChain: "ACCEPT", MatchSet: kubeHealthCheckNodePortSet,
|
||||
}},
|
||||
}
|
||||
checkIptables(t, ipt, epIpt)
|
||||
}
|
||||
|
||||
func TestLoadBalanceSourceRanges(t *testing.T) {
|
||||
ipt, fp := buildFakeProxier()
|
||||
|
||||
@ -4295,6 +4364,61 @@ func TestEndpointSliceE2E(t *testing.T) {
|
||||
assert.Nil(t, rsErr2, "Expected no error getting real servers")
|
||||
assert.Len(t, realServers2, 0, "Expected 0 real servers")
|
||||
}
|
||||
|
||||
func TestHealthCheckNodePortE2E(t *testing.T) {
|
||||
ipt := iptablestest.NewFake()
|
||||
ipvs := ipvstest.NewFake()
|
||||
ipset := ipsettest.NewFake(testIPSetVersion)
|
||||
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, true, v1.IPv4Protocol)
|
||||
fp.servicesSynced = true
|
||||
fp.endpointsSynced = true
|
||||
fp.endpointSlicesSynced = true
|
||||
|
||||
// Add initial service
|
||||
serviceName := "svc1"
|
||||
namespaceName := "ns1"
|
||||
|
||||
svc := v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName},
|
||||
Spec: v1.ServiceSpec{
|
||||
ClusterIP: "172.20.1.1",
|
||||
Selector: map[string]string{"foo": "bar"},
|
||||
Ports: []v1.ServicePort{{Name: "", TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP}},
|
||||
Type: "LoadBalancer",
|
||||
HealthCheckNodePort: 30000,
|
||||
ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeLocal,
|
||||
},
|
||||
}
|
||||
fp.OnServiceAdd(&svc)
|
||||
fp.syncProxyRules()
|
||||
|
||||
// Ensure that Proxier updates ipvs appropriately after service's being created
|
||||
assert.NotNil(t, fp.ipsetList["KUBE-HEALTH-CHECK-NODE-PORT"])
|
||||
activeEntries1 := fp.ipsetList["KUBE-HEALTH-CHECK-NODE-PORT"].activeEntries
|
||||
assert.Equal(t, 1, activeEntries1.Len(), "Expected 1 active entry in KUBE-HEALTH-CHECK-NODE-PORT")
|
||||
assert.Equal(t, true, activeEntries1.Has("30000"), "Expected activeEntries to reference hc node port in spec")
|
||||
|
||||
// Update health check node port in the spec
|
||||
newSvc := svc
|
||||
newSvc.Spec.HealthCheckNodePort = 30001
|
||||
fp.OnServiceUpdate(&svc, &newSvc)
|
||||
fp.syncProxyRules()
|
||||
|
||||
// Ensure that Proxier updates ipvs appropriately after service's being updated
|
||||
assert.NotNil(t, fp.ipsetList["KUBE-HEALTH-CHECK-NODE-PORT"])
|
||||
activeEntries2 := fp.ipsetList["KUBE-HEALTH-CHECK-NODE-PORT"].activeEntries
|
||||
assert.Equal(t, 1, activeEntries2.Len(), "Expected 1 active entry in KUBE-HEALTH-CHECK-NODE-PORT")
|
||||
assert.Equal(t, true, activeEntries2.Has("30001"), "Expected activeEntries to reference updated hc node port in spec")
|
||||
|
||||
fp.OnServiceDelete(&svc)
|
||||
fp.syncProxyRules()
|
||||
|
||||
// Ensure that Proxier updates ipvs appropriately after EndpointSlice delete
|
||||
assert.NotNil(t, fp.ipsetList["KUBE-HEALTH-CHECK-NODE-PORT"])
|
||||
activeEntries3 := fp.ipsetList["KUBE-HEALTH-CHECK-NODE-PORT"].activeEntries
|
||||
assert.Equal(t, 0, activeEntries3.Len(), "Expected 0 active entries in KUBE-HEALTH-CHECK-NODE-PORT")
|
||||
}
|
||||
|
||||
func TestFilterCIDRs(t *testing.T) {
|
||||
var cidrList []string
|
||||
var cidrs []string
|
||||
@ -4339,6 +4463,7 @@ func TestCreateAndLinkKubeChain(t *testing.T) {
|
||||
:KUBE-MARK-MASQ - [0:0]
|
||||
`
|
||||
expectedFilterChains := `:KUBE-FORWARD - [0:0]
|
||||
:KUBE-NODE-PORT - [0:0]
|
||||
`
|
||||
assert.Equal(t, expectedNATChains, fp.natChains.String())
|
||||
assert.Equal(t, expectedFilterChains, fp.filterChains.String())
|
||||
|
@ -38,6 +38,8 @@ const (
|
||||
Jump = "-j "
|
||||
// Reject specifies the reject target
|
||||
Reject = "REJECT"
|
||||
// Accept specifies the accept target
|
||||
Accept = "ACCEPT"
|
||||
// ToDest represents the flag used to specify the destination address in DNAT
|
||||
ToDest = "--to-destination "
|
||||
// Recent represents the sub-command recent that allows to dynamically create list of IP address to match against
|
||||
|
Loading…
Reference in New Issue
Block a user