diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index a61ae327267..9de9aa7204a 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -1321,6 +1321,11 @@ func (proxier *Proxier) syncProxyRules() { writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) } + // Accept all traffic with destination of ipvs virtual service, in case other iptables rules + // block the traffic, that may result in ipvs rules invalid. + // Those rules must be in the end of KUBE-SERVICE chain + proxier.acceptIPVSTraffic() + // If the masqueradeMark has been added then we want to forward that same // traffic, this allows NodePort traffic to be forwarded even if the default // FORWARD policy is not accept. @@ -1418,6 +1423,26 @@ func (proxier *Proxier) syncProxyRules() { proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints) } +func (proxier *Proxier) acceptIPVSTraffic() { + sets := []*IPSet{proxier.clusterIPSet, proxier.externalIPSet, proxier.lbIngressSet} + for _, set := range sets { + var matchType string + if !set.isEmpty() { + switch set.SetType { + case utilipset.BitmapPort: + matchType = "dst" + default: + matchType = "dst,dst" + } + writeLine(proxier.natRules, []string{ + "-A", string(kubeServicesChain), + "-m", "set", "--match-set", set.Name, matchType, + "-j", "ACCEPT", + }...) + } + } +} + // After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we // risk sending more traffic to it, all of which will be lost (because UDP). // This assumes the proxier mutex is held diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 0350fb88cb6..e540c68df68 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -981,6 +981,71 @@ func TestLoadBalanceSourceRanges(t *testing.T) { checkIptables(t, ipt, epIpt) } +func TestAcceptIPVSTraffic(t *testing.T) { + ipt, fp := buildFakeProxier(nil) + + ingressIP := "1.2.3.4" + externalIP := []string{"5.6.7.8"} + svcInfos := []struct { + svcType api.ServiceType + svcIP string + svcName string + epIP string + }{ + {api.ServiceTypeClusterIP, "10.20.30.40", "svc1", "10.180.0.1"}, + {api.ServiceTypeLoadBalancer, "10.20.30.41", "svc2", "10.180.0.2"}, + {api.ServiceTypeNodePort, "10.20.30.42", "svc3", "10.180.0.3"}, + } + + for _, svcInfo := range svcInfos { + makeServiceMap(fp, + makeTestService("ns1", svcInfo.svcName, func(svc *api.Service) { + svc.Spec.Type = svcInfo.svcType + svc.Spec.ClusterIP = svcInfo.svcIP + svc.Spec.Ports = []api.ServicePort{{ + Name: "p80", + Port: 80, + Protocol: api.ProtocolTCP, + NodePort: 80, + }} + if svcInfo.svcType == api.ServiceTypeLoadBalancer { + svc.Status.LoadBalancer.Ingress = []api.LoadBalancerIngress{{ + IP: ingressIP, + }} + } + if svcInfo.svcType == api.ServiceTypeClusterIP { + svc.Spec.ExternalIPs = externalIP + } + }), + ) + + makeEndpointsMap(fp, + makeTestEndpoints("ns1", "p80", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: svcInfo.epIP, + }}, + Ports: []api.EndpointPort{{ + Name: "p80", + Port: 80, + }}, + }} + }), + ) + } + fp.syncProxyRules() + + // Check iptables chain and rules + epIpt := netlinktest.ExpectedIptablesChain{ + string(kubeServicesChain): { + {JumpChain: "ACCEPT", MatchSet: KubeClusterIPSet}, + {JumpChain: "ACCEPT", MatchSet: KubeLoadBalancerSet}, + {JumpChain: "ACCEPT", MatchSet: KubeExternalIPSet}, + }, + } + checkIptables(t, ipt, epIpt) +} + func TestOnlyLocalLoadBalancing(t *testing.T) { ipt, fp := buildFakeProxier(nil)