Refactor handling of local traffic detection.

This commit is contained in:
Satyadeep Musuvathy
2020-01-16 18:51:31 -08:00
parent 3631887a28
commit 8c6956e5bb
24 changed files with 804 additions and 101 deletions

View File

@@ -17,6 +17,7 @@ go_library(
"//pkg/proxy/metaproxier:go_default_library",
"//pkg/proxy/metrics:go_default_library",
"//pkg/proxy/util:go_default_library",
"//pkg/proxy/util/iptables:go_default_library",
"//pkg/util/async:go_default_library",
"//pkg/util/conntrack:go_default_library",
"//pkg/util/iptables:go_default_library",
@@ -41,6 +42,7 @@ go_test(
"//pkg/proxy:go_default_library",
"//pkg/proxy/healthcheck:go_default_library",
"//pkg/proxy/util:go_default_library",
"//pkg/proxy/util/iptables:go_default_library",
"//pkg/proxy/util/testing:go_default_library",
"//pkg/util/async:go_default_library",
"//pkg/util/conntrack:go_default_library",

View File

@@ -33,7 +33,7 @@ import (
"sync/atomic"
"time"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1beta1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
@@ -46,6 +46,7 @@ import (
"k8s.io/kubernetes/pkg/proxy/metaproxier"
"k8s.io/kubernetes/pkg/proxy/metrics"
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables"
"k8s.io/kubernetes/pkg/util/async"
"k8s.io/kubernetes/pkg/util/conntrack"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
@@ -201,7 +202,7 @@ type Proxier struct {
masqueradeAll bool
masqueradeMark string
exec utilexec.Interface
clusterCIDR string
localDetector proxyutiliptables.LocalTrafficDetector
hostname string
nodeIP net.IP
portMapper utilproxy.PortOpener
@@ -260,7 +261,7 @@ func NewProxier(ipt utiliptables.Interface,
minSyncPeriod time.Duration,
masqueradeAll bool,
masqueradeBit int,
clusterCIDR string,
localDetector proxyutiliptables.LocalTrafficDetector,
hostname string,
nodeIP net.IP,
recorder record.EventRecorder,
@@ -285,12 +286,6 @@ func NewProxier(ipt utiliptables.Interface,
masqueradeValue := 1 << uint(masqueradeBit)
masqueradeMark := fmt.Sprintf("%#08x/%#08x", masqueradeValue, masqueradeValue)
if len(clusterCIDR) == 0 {
klog.Warning("clusterCIDR not specified, unable to distinguish between internal and external traffic")
} else if utilnet.IsIPv6CIDRString(clusterCIDR) != ipt.IsIpv6() {
return nil, fmt.Errorf("clusterCIDR %s has incorrect IP version: expect isIPv6=%t", clusterCIDR, ipt.IsIpv6())
}
endpointSlicesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying)
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder)
@@ -307,7 +302,7 @@ func NewProxier(ipt utiliptables.Interface,
masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark,
exec: exec,
clusterCIDR: clusterCIDR,
localDetector: localDetector,
hostname: hostname,
nodeIP: nodeIP,
portMapper: &listenPortOpener{},
@@ -345,7 +340,7 @@ func NewDualStackProxier(
minSyncPeriod time.Duration,
masqueradeAll bool,
masqueradeBit int,
clusterCIDR [2]string,
localDetectors [2]proxyutiliptables.LocalTrafficDetector,
hostname string,
nodeIP [2]net.IP,
recorder record.EventRecorder,
@@ -354,17 +349,15 @@ func NewDualStackProxier(
) (proxy.Provider, error) {
// Create an ipv4 instance of the single-stack proxier
ipv4Proxier, err := NewProxier(ipt[0], sysctl,
exec, syncPeriod, minSyncPeriod,
masqueradeAll, masqueradeBit, clusterCIDR[0], hostname, nodeIP[0],
recorder, healthzServer, nodePortAddresses)
exec, syncPeriod, minSyncPeriod, masqueradeAll, masqueradeBit, localDetectors[0], hostname,
nodeIP[0], recorder, healthzServer, nodePortAddresses)
if err != nil {
return nil, fmt.Errorf("unable to create ipv4 proxier: %v", err)
}
ipv6Proxier, err := NewProxier(ipt[1], sysctl,
exec, syncPeriod, minSyncPeriod,
masqueradeAll, masqueradeBit, clusterCIDR[1], hostname, nodeIP[1],
recorder, healthzServer, nodePortAddresses)
exec, syncPeriod, minSyncPeriod, masqueradeAll, masqueradeBit, localDetectors[1], hostname,
nodeIP[1], recorder, healthzServer, nodePortAddresses)
if err != nil {
return nil, fmt.Errorf("unable to create ipv6 proxier: %v", err)
}
@@ -1014,13 +1007,13 @@ func (proxier *Proxier) syncProxyRules() {
)
if proxier.masqueradeAll {
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
} else if len(proxier.clusterCIDR) > 0 {
} else if proxier.localDetector.IsImplemented() {
// This masquerades off-cluster traffic to a service VIP. The idea
// is that you can establish a static route for your Service range,
// routing to any node, and that node will bridge into the Service
// for you. Since that might bounce off-node, we masquerade here.
// If/when we support "Local" policy for VIPs, we should update this.
writeLine(proxier.natRules, append(args, "! -s", proxier.clusterCIDR, "-j", string(KubeMarkMasqChain))...)
writeLine(proxier.natRules, proxier.localDetector.JumpIfNotLocal(args, string(KubeMarkMasqChain))...)
}
writeLine(proxier.natRules, append(args, "-j", string(svcChain))...)
} else {
@@ -1374,16 +1367,14 @@ func (proxier *Proxier) syncProxyRules() {
// First rule in the chain redirects all pod -> external VIP traffic to the
// Service's ClusterIP instead. This happens whether or not we have local
// endpoints; only if clusterCIDR is specified
if len(proxier.clusterCIDR) > 0 {
// endpoints; only if localDetector is implemented
if proxier.localDetector.IsImplemented() {
args = append(args[:0],
"-A", string(svcXlbChain),
"-m", "comment", "--comment",
`"Redirect pods trying to reach external loadbalancer VIP to clusterIP"`,
"-s", proxier.clusterCIDR,
"-j", string(svcChain),
)
writeLine(proxier.natRules, args...)
writeLine(proxier.natRules, proxier.localDetector.JumpIfLocal(args, string(svcChain))...)
}
// Next, redirect all src-type=LOCAL -> LB IP to the service chain for externalTrafficPolicy=Local
@@ -1513,29 +1504,23 @@ func (proxier *Proxier) syncProxyRules() {
"-j", "ACCEPT",
)
// The following rules can only be set if clusterCIDR has been defined.
if len(proxier.clusterCIDR) != 0 {
// The following two rules ensure the traffic after the initial packet
// accepted by the "kubernetes forwarding rules" rule above will be
// accepted, to be as specific as possible the traffic must be sourced
// or destined to the clusterCIDR (to/from a pod).
writeLine(proxier.filterRules,
"-A", string(kubeForwardChain),
"-s", proxier.clusterCIDR,
"-m", "comment", "--comment", `"kubernetes forwarding conntrack pod source rule"`,
"-m", "conntrack",
"--ctstate", "RELATED,ESTABLISHED",
"-j", "ACCEPT",
)
writeLine(proxier.filterRules,
"-A", string(kubeForwardChain),
"-m", "comment", "--comment", `"kubernetes forwarding conntrack pod destination rule"`,
"-d", proxier.clusterCIDR,
"-m", "conntrack",
"--ctstate", "RELATED,ESTABLISHED",
"-j", "ACCEPT",
)
}
// The following two rules ensure the traffic after the initial packet
// accepted by the "kubernetes forwarding rules" rule above will be
// accepted.
writeLine(proxier.filterRules,
"-A", string(kubeForwardChain),
"-m", "comment", "--comment", `"kubernetes forwarding conntrack pod source rule"`,
"-m", "conntrack",
"--ctstate", "RELATED,ESTABLISHED",
"-j", "ACCEPT",
)
writeLine(proxier.filterRules,
"-A", string(kubeForwardChain),
"-m", "comment", "--comment", `"kubernetes forwarding conntrack pod destination rule"`,
"-m", "conntrack",
"--ctstate", "RELATED,ESTABLISHED",
"-j", "ACCEPT",
)
// Write the end-of-table markers.
writeLine(proxier.filterRules, "COMMIT")

View File

@@ -37,6 +37,7 @@ import (
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables"
utilproxytest "k8s.io/kubernetes/pkg/proxy/util/testing"
"k8s.io/kubernetes/pkg/util/async"
"k8s.io/kubernetes/pkg/util/conntrack"
@@ -345,6 +346,7 @@ const testHostname = "test-hostname"
func NewFakeProxier(ipt utiliptables.Interface, endpointSlicesEnabled bool) *Proxier {
// TODO: Call NewProxier after refactoring out the goroutine
// invocation into a Run() method.
detectLocal, _ := proxyutiliptables.NewDetectLocalByCIDR("10.0.0.0/24", ipt)
p := &Proxier{
exec: &fakeexec.FakeExec{},
serviceMap: make(proxy.ServiceMap),
@@ -352,7 +354,7 @@ func NewFakeProxier(ipt utiliptables.Interface, endpointSlicesEnabled bool) *Pro
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, newEndpointInfo, nil, nil, endpointSlicesEnabled),
iptables: ipt,
clusterCIDR: "10.0.0.0/24",
localDetector: detectLocal,
hostname: testHostname,
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
portMapper: &fakePortOpener{[]*utilproxy.LocalPort{}},
@@ -958,8 +960,6 @@ func TestOnlyLocalLoadBalancing(t *testing.T) {
func TestOnlyLocalNodePortsNoClusterCIDR(t *testing.T) {
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt, false)
// set cluster CIDR to empty before test
fp.clusterCIDR = ""
onlyLocalNodePorts(t, fp, ipt)
}
@@ -2342,8 +2342,8 @@ func TestEndpointSliceE2E(t *testing.T) {
:KUBE-FORWARD - [0:0]
-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark -j ACCEPT
-A KUBE-FORWARD -s 10.0.0.0/24 -m comment --comment "kubernetes forwarding conntrack pod source rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod destination rule" -d 10.0.0.0/24 -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod source rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod destination rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
COMMIT
*nat
:KUBE-SERVICES - [0:0]