diff --git a/cmd/kube-proxy/app/BUILD b/cmd/kube-proxy/app/BUILD index 6cfc9fe681d..db32ed09245 100644 --- a/cmd/kube-proxy/app/BUILD +++ b/cmd/kube-proxy/app/BUILD @@ -79,6 +79,7 @@ go_library( ] + select({ "@io_bazel_rules_go//go/platform:android": [ "//pkg/proxy/metrics:go_default_library", + "//pkg/proxy/util/iptables:go_default_library", "//pkg/util/node:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", @@ -87,6 +88,7 @@ go_library( ], "@io_bazel_rules_go//go/platform:darwin": [ "//pkg/proxy/metrics:go_default_library", + "//pkg/proxy/util/iptables:go_default_library", "//pkg/util/node:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", @@ -95,6 +97,7 @@ go_library( ], "@io_bazel_rules_go//go/platform:dragonfly": [ "//pkg/proxy/metrics:go_default_library", + "//pkg/proxy/util/iptables:go_default_library", "//pkg/util/node:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", @@ -103,6 +106,7 @@ go_library( ], "@io_bazel_rules_go//go/platform:freebsd": [ "//pkg/proxy/metrics:go_default_library", + "//pkg/proxy/util/iptables:go_default_library", "//pkg/util/node:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", @@ -111,6 +115,7 @@ go_library( ], "@io_bazel_rules_go//go/platform:ios": [ "//pkg/proxy/metrics:go_default_library", + "//pkg/proxy/util/iptables:go_default_library", "//pkg/util/node:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", @@ -119,6 +124,7 @@ go_library( ], "@io_bazel_rules_go//go/platform:linux": [ "//pkg/proxy/metrics:go_default_library", + "//pkg/proxy/util/iptables:go_default_library", "//pkg/util/node:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", @@ -127,6 +133,7 @@ go_library( ], "@io_bazel_rules_go//go/platform:nacl": [ "//pkg/proxy/metrics:go_default_library", + "//pkg/proxy/util/iptables:go_default_library", "//pkg/util/node:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", @@ -135,6 +142,7 @@ go_library( ], "@io_bazel_rules_go//go/platform:netbsd": [ "//pkg/proxy/metrics:go_default_library", + "//pkg/proxy/util/iptables:go_default_library", "//pkg/util/node:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", @@ -143,6 +151,7 @@ go_library( ], "@io_bazel_rules_go//go/platform:openbsd": [ "//pkg/proxy/metrics:go_default_library", + "//pkg/proxy/util/iptables:go_default_library", "//pkg/util/node:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", @@ -151,6 +160,7 @@ go_library( ], "@io_bazel_rules_go//go/platform:plan9": [ "//pkg/proxy/metrics:go_default_library", + "//pkg/proxy/util/iptables:go_default_library", "//pkg/util/node:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", @@ -159,6 +169,7 @@ go_library( ], "@io_bazel_rules_go//go/platform:solaris": [ "//pkg/proxy/metrics:go_default_library", + "//pkg/proxy/util/iptables:go_default_library", "//pkg/util/node:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", @@ -190,43 +201,76 @@ go_test( "//pkg/proxy/apis/config:go_default_library", "//pkg/util/configz:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", "//staging/src/k8s.io/component-base/config:go_default_library", + "//vendor/github.com/google/go-cmp/cmp:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/k8s.io/utils/pointer:go_default_library", ] + select({ "@io_bazel_rules_go//go/platform:android": [ "//pkg/proxy/ipvs:go_default_library", + "//pkg/proxy/util/iptables:go_default_library", + "//pkg/util/iptables:go_default_library", + "//pkg/util/iptables/testing:go_default_library", ], "@io_bazel_rules_go//go/platform:darwin": [ "//pkg/proxy/ipvs:go_default_library", + "//pkg/proxy/util/iptables:go_default_library", + "//pkg/util/iptables:go_default_library", + "//pkg/util/iptables/testing:go_default_library", ], "@io_bazel_rules_go//go/platform:dragonfly": [ "//pkg/proxy/ipvs:go_default_library", + "//pkg/proxy/util/iptables:go_default_library", + "//pkg/util/iptables:go_default_library", + "//pkg/util/iptables/testing:go_default_library", ], "@io_bazel_rules_go//go/platform:freebsd": [ "//pkg/proxy/ipvs:go_default_library", + "//pkg/proxy/util/iptables:go_default_library", + "//pkg/util/iptables:go_default_library", + "//pkg/util/iptables/testing:go_default_library", ], "@io_bazel_rules_go//go/platform:ios": [ "//pkg/proxy/ipvs:go_default_library", + "//pkg/proxy/util/iptables:go_default_library", + "//pkg/util/iptables:go_default_library", + "//pkg/util/iptables/testing:go_default_library", ], "@io_bazel_rules_go//go/platform:linux": [ "//pkg/proxy/ipvs:go_default_library", + "//pkg/proxy/util/iptables:go_default_library", + "//pkg/util/iptables:go_default_library", + "//pkg/util/iptables/testing:go_default_library", ], "@io_bazel_rules_go//go/platform:nacl": [ "//pkg/proxy/ipvs:go_default_library", + "//pkg/proxy/util/iptables:go_default_library", + "//pkg/util/iptables:go_default_library", + "//pkg/util/iptables/testing:go_default_library", ], "@io_bazel_rules_go//go/platform:netbsd": [ "//pkg/proxy/ipvs:go_default_library", + "//pkg/proxy/util/iptables:go_default_library", + "//pkg/util/iptables:go_default_library", + "//pkg/util/iptables/testing:go_default_library", ], "@io_bazel_rules_go//go/platform:openbsd": [ "//pkg/proxy/ipvs:go_default_library", + "//pkg/proxy/util/iptables:go_default_library", + "//pkg/util/iptables:go_default_library", + "//pkg/util/iptables/testing:go_default_library", ], "@io_bazel_rules_go//go/platform:plan9": [ "//pkg/proxy/ipvs:go_default_library", + "//pkg/proxy/util/iptables:go_default_library", + "//pkg/util/iptables:go_default_library", + "//pkg/util/iptables/testing:go_default_library", ], "@io_bazel_rules_go//go/platform:solaris": [ "//pkg/proxy/ipvs:go_default_library", + "//pkg/proxy/util/iptables:go_default_library", + "//pkg/util/iptables:go_default_library", + "//pkg/util/iptables/testing:go_default_library", ], "//conditions:default": [], }), diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index 033c297d15c..c9e2e86308e 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -203,6 +203,7 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) { fs.BoolVar(&o.config.EnableProfiling, "profiling", o.config.EnableProfiling, "If true enables profiling via web interface on /debug/pprof handler.") fs.Float32Var(&o.config.ClientConnection.QPS, "kube-api-qps", o.config.ClientConnection.QPS, "QPS to use while talking with kubernetes apiserver") + fs.Var(&o.config.DetectLocalMode, "detect-local-mode", "Mode to use to detect local traffic") } // NewOptions returns initialized Options diff --git a/cmd/kube-proxy/app/server_others.go b/cmd/kube-proxy/app/server_others.go index 23c977007f8..15d3a1ae178 100644 --- a/cmd/kube-proxy/app/server_others.go +++ b/cmd/kube-proxy/app/server_others.go @@ -42,6 +42,7 @@ import ( "k8s.io/kubernetes/pkg/proxy/ipvs" proxymetrics "k8s.io/kubernetes/pkg/proxy/metrics" "k8s.io/kubernetes/pkg/proxy/userspace" + proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables" "k8s.io/kubernetes/pkg/util/configz" utilipset "k8s.io/kubernetes/pkg/util/ipset" utiliptables "k8s.io/kubernetes/pkg/util/iptables" @@ -136,8 +137,14 @@ func newProxyServer( } var proxier proxy.Provider + var detectLocalMode proxyconfigapi.LocalMode proxyMode := getProxyMode(string(config.Mode), kernelHandler, ipsetInterface, iptables.LinuxKernelCompatTester{}) + detectLocalMode, err = getDetectLocalMode(config) + if err != nil { + return nil, fmt.Errorf("cannot determine detect-local-mode: %v", err) + } + nodeIP := net.ParseIP(config.BindAddress) if nodeIP.IsUnspecified() { nodeIP = utilnode.GetNodeIP(client, hostname) @@ -146,6 +153,9 @@ func newProxyServer( nodeIP = net.ParseIP("127.0.0.1") } } + + klog.V(2).Info("DetectLocalMode: '", string(detectLocalMode), "'") + if proxyMode == proxyModeIPTables { klog.V(0).Info("Using iptables Proxier.") if config.IPTables.MasqueradeBit == nil { @@ -167,6 +177,13 @@ func newProxyServer( ipt[1] = utiliptables.New(execer, utiliptables.ProtocolIpv6) } + // Always ordered to match []ipt + var localDetectors [2]proxyutiliptables.LocalTrafficDetector + localDetectors, err = getDualStackLocalDetectorTuple(detectLocalMode, config, ipt) + if err != nil { + return nil, fmt.Errorf("unable to create proxier: %v", err) + } + // TODO this has side effects that should only happen when Run() is invoked. proxier, err = iptables.NewDualStackProxier( ipt, @@ -176,7 +193,7 @@ func newProxyServer( config.IPTables.MinSyncPeriod.Duration, config.IPTables.MasqueradeAll, int(*config.IPTables.MasqueradeBit), - cidrTuple(config.ClusterCIDR), + localDetectors, hostname, nodeIPTuple(config.BindAddress), recorder, @@ -184,6 +201,12 @@ func newProxyServer( config.NodePortAddresses, ) } else { // Create a single-stack proxier. + var localDetector proxyutiliptables.LocalTrafficDetector + localDetector, err = getLocalDetector(detectLocalMode, config, iptInterface) + if err != nil { + return nil, fmt.Errorf("unable to create proxier: %v", err) + } + // TODO this has side effects that should only happen when Run() is invoked. proxier, err = iptables.NewProxier( iptInterface, @@ -193,7 +216,7 @@ func newProxyServer( config.IPTables.MinSyncPeriod.Duration, config.IPTables.MasqueradeAll, int(*config.IPTables.MasqueradeBit), - config.ClusterCIDR, + localDetector, hostname, nodeIP, recorder, @@ -222,6 +245,15 @@ func newProxyServer( ipt[1] = utiliptables.New(execer, utiliptables.ProtocolIpv6) } + nodeIPs := nodeIPTuple(config.BindAddress) + + // Always ordered to match []ipt + var localDetectors [2]proxyutiliptables.LocalTrafficDetector + localDetectors, err = getDualStackLocalDetectorTuple(detectLocalMode, config, ipt) + if err != nil { + return nil, fmt.Errorf("unable to create proxier: %v", err) + } + proxier, err = ipvs.NewDualStackProxier( ipt, ipvsInterface, @@ -237,15 +269,21 @@ func newProxyServer( config.IPVS.UDPTimeout.Duration, config.IPTables.MasqueradeAll, int(*config.IPTables.MasqueradeBit), - cidrTuple(config.ClusterCIDR), + localDetectors, hostname, - nodeIPTuple(config.BindAddress), + nodeIPs, recorder, healthzServer, config.IPVS.Scheduler, config.NodePortAddresses, ) } else { + var localDetector proxyutiliptables.LocalTrafficDetector + localDetector, err = getLocalDetector(detectLocalMode, config, iptInterface) + if err != nil { + return nil, fmt.Errorf("unable to create proxier: %v", err) + } + proxier, err = ipvs.NewProxier( iptInterface, ipvsInterface, @@ -261,7 +299,7 @@ func newProxyServer( config.IPVS.UDPTimeout.Duration, config.IPTables.MasqueradeAll, int(*config.IPTables.MasqueradeBit), - config.ClusterCIDR, + localDetector, hostname, nodeIP, recorder, @@ -317,6 +355,67 @@ func newProxyServer( }, nil } +func getDetectLocalMode(config *proxyconfigapi.KubeProxyConfiguration) (proxyconfigapi.LocalMode, error) { + mode := config.DetectLocalMode + switch mode { + case proxyconfigapi.LocalModeClusterCIDR: + return mode, nil + default: + if strings.TrimSpace(mode.String()) != "" { + return mode, fmt.Errorf("unknown detect-local-mode: %v", mode) + } + klog.V(4).Info("Defaulting detect-local-mode to ", string(proxyconfigapi.LocalModeClusterCIDR)) + return proxyconfigapi.LocalModeClusterCIDR, nil + } +} + +func getLocalDetector(mode proxyconfigapi.LocalMode, config *proxyconfigapi.KubeProxyConfiguration, ipt utiliptables.Interface) (proxyutiliptables.LocalTrafficDetector, error) { + switch mode { + case proxyconfigapi.LocalModeClusterCIDR: + if len(strings.TrimSpace(config.ClusterCIDR)) == 0 { + klog.Warning("detect-local-mode set to ClusterCIDR, but no cluster CIDR defined") + break + } + return proxyutiliptables.NewDetectLocalByCIDR(config.ClusterCIDR, ipt) + } + klog.V(0).Info("detect-local-mode: ", string(mode), " , defaulting to no-op detect-local") + return proxyutiliptables.NewNoOpLocalDetector(), nil +} + +func getDualStackLocalDetectorTuple(mode proxyconfigapi.LocalMode, config *proxyconfigapi.KubeProxyConfiguration, ipt [2]utiliptables.Interface) ([2]proxyutiliptables.LocalTrafficDetector, error) { + var err error + localDetectors := [2]proxyutiliptables.LocalTrafficDetector{proxyutiliptables.NewNoOpLocalDetector(), proxyutiliptables.NewNoOpLocalDetector()} + switch mode { + case proxyconfigapi.LocalModeClusterCIDR: + if len(strings.TrimSpace(config.ClusterCIDR)) == 0 { + klog.Warning("detect-local-mode set to ClusterCIDR, but no cluster CIDR defined") + break + } + + clusterCIDRs := cidrTuple(config.ClusterCIDR) + + if len(strings.TrimSpace(clusterCIDRs[0])) == 0 { + klog.Warning("detect-local-mode set to ClusterCIDR, but no IPv4 cluster CIDR defined, defaulting to no-op detect-local for IPv4") + } else { + localDetectors[0], err = proxyutiliptables.NewDetectLocalByCIDR(clusterCIDRs[0], ipt[0]) + if err != nil { // don't loose the original error + return localDetectors, err + } + } + + if len(strings.TrimSpace(clusterCIDRs[1])) == 0 { + klog.Warning("detect-local-mode set to ClusterCIDR, but no IPv6 cluster CIDR defined, , defaulting to no-op detect-local for IPv6") + } else { + localDetectors[1], err = proxyutiliptables.NewDetectLocalByCIDR(clusterCIDRs[1], ipt[1]) + } + return localDetectors, err + default: + klog.Warningf("unknown detect-local-mode: %v", mode) + } + klog.Warning("detect-local-mode: ", string(mode), " , defaulting to no-op detect-local") + return localDetectors, nil +} + // cidrTuple takes a comma separated list of CIDRs and return a tuple (ipv4cidr,ipv6cidr) // The returned tuple is guaranteed to have the order (ipv4,ipv6) and if no cidr from a family is found an // empty string "" is inserted. diff --git a/cmd/kube-proxy/app/server_others_test.go b/cmd/kube-proxy/app/server_others_test.go index eb6b086142a..56c66375ffa 100644 --- a/cmd/kube-proxy/app/server_others_test.go +++ b/cmd/kube-proxy/app/server_others_test.go @@ -20,9 +20,14 @@ package app import ( "fmt" + "reflect" "testing" + proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config" "k8s.io/kubernetes/pkg/proxy/ipvs" + proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables" + utiliptables "k8s.io/kubernetes/pkg/util/iptables" + utiliptablestest "k8s.io/kubernetes/pkg/util/iptables/testing" ) type fakeIPSetVersioner struct { @@ -145,3 +150,230 @@ func Test_getProxyMode(t *testing.T) { } } } + +func Test_getDetectLocalMode(t *testing.T) { + cases := []struct { + detectLocal string + expected proxyconfigapi.LocalMode + errExpected bool + }{ + { + detectLocal: "", + expected: proxyconfigapi.LocalModeClusterCIDR, + errExpected: false, + }, + { + detectLocal: string(proxyconfigapi.LocalModeClusterCIDR), + expected: proxyconfigapi.LocalModeClusterCIDR, + errExpected: false, + }, + { + detectLocal: "abcd", + expected: proxyconfigapi.LocalMode("abcd"), + errExpected: true, + }, + } + for i, c := range cases { + proxyConfig := &proxyconfigapi.KubeProxyConfiguration{DetectLocalMode: proxyconfigapi.LocalMode(c.detectLocal)} + r, err := getDetectLocalMode(proxyConfig) + if c.errExpected { + if err == nil { + t.Errorf("Expected error, but did not fail for mode %v", c.detectLocal) + } + continue + } + if err != nil { + t.Errorf("Got error parsing mode: %v", err) + continue + } + if r != c.expected { + t.Errorf("Case[%d] Expected %q got %q", i, c.expected, r) + } + } +} + +func Test_getLocalDetector(t *testing.T) { + cases := []struct { + mode proxyconfigapi.LocalMode + config *proxyconfigapi.KubeProxyConfiguration + ipt utiliptables.Interface + expected proxyutiliptables.LocalTrafficDetector + errExpected bool + }{ + { + mode: proxyconfigapi.LocalModeClusterCIDR, + config: &proxyconfigapi.KubeProxyConfiguration{ClusterCIDR: "10.0.0.0/14"}, + ipt: utiliptablestest.NewFake(), + expected: resolveLocalDetector(t)(proxyutiliptables.NewDetectLocalByCIDR("10.0.0.0/14", utiliptablestest.NewFake())), + errExpected: false, + }, + { + mode: proxyconfigapi.LocalModeClusterCIDR, + config: &proxyconfigapi.KubeProxyConfiguration{ClusterCIDR: "2002::1234:abcd:ffff:c0a8:101/64"}, + ipt: utiliptablestest.NewIpv6Fake(), + expected: resolveLocalDetector(t)(proxyutiliptables.NewDetectLocalByCIDR("2002::1234:abcd:ffff:c0a8:101/64", utiliptablestest.NewIpv6Fake())), + errExpected: false, + }, + { + mode: proxyconfigapi.LocalModeClusterCIDR, + config: &proxyconfigapi.KubeProxyConfiguration{ClusterCIDR: "10.0.0.0"}, + ipt: utiliptablestest.NewFake(), + expected: nil, + errExpected: true, + }, + { + mode: proxyconfigapi.LocalModeClusterCIDR, + config: &proxyconfigapi.KubeProxyConfiguration{ClusterCIDR: "2002::1234:abcd:ffff:c0a8:101"}, + ipt: utiliptablestest.NewIpv6Fake(), + expected: nil, + errExpected: true, + }, + { + mode: proxyconfigapi.LocalModeClusterCIDR, + config: &proxyconfigapi.KubeProxyConfiguration{ClusterCIDR: "10.0.0.0/14"}, + ipt: utiliptablestest.NewIpv6Fake(), + expected: nil, + errExpected: true, + }, + { + mode: proxyconfigapi.LocalModeClusterCIDR, + config: &proxyconfigapi.KubeProxyConfiguration{ClusterCIDR: "2002::1234:abcd:ffff:c0a8:101/64"}, + ipt: utiliptablestest.NewFake(), + expected: nil, + errExpected: true, + }, + { + mode: proxyconfigapi.LocalMode("abcd"), + config: &proxyconfigapi.KubeProxyConfiguration{ClusterCIDR: "10.0.0.0/14"}, + ipt: utiliptablestest.NewFake(), + expected: proxyutiliptables.NewNoOpLocalDetector(), + errExpected: false, + }, + { + mode: proxyconfigapi.LocalModeClusterCIDR, + config: &proxyconfigapi.KubeProxyConfiguration{ClusterCIDR: ""}, + ipt: utiliptablestest.NewFake(), + expected: proxyutiliptables.NewNoOpLocalDetector(), + errExpected: false, + }, + } + for i, c := range cases { + r, err := getLocalDetector(c.mode, c.config, c.ipt) + if c.errExpected { + if err == nil { + t.Errorf("Case[%d] Expected error, but succeeded with %v", i, r) + } + continue + } + if err != nil { + t.Errorf("Case[%d] Error resolving detect-local: %v", i, err) + continue + } + if !reflect.DeepEqual(r, c.expected) { + t.Errorf("Case[%d] Unexpected detect-local implementation, expected: %q, got: %q", i, c.expected, r) + } + } +} + +func Test_getDualStackLocalDetectorTuple(t *testing.T) { + cases := []struct { + mode proxyconfigapi.LocalMode + config *proxyconfigapi.KubeProxyConfiguration + ipt [2]utiliptables.Interface + expected [2]proxyutiliptables.LocalTrafficDetector + errExpected bool + }{ + { + mode: proxyconfigapi.LocalModeClusterCIDR, + config: &proxyconfigapi.KubeProxyConfiguration{ClusterCIDR: "10.0.0.0/14,2002::1234:abcd:ffff:c0a8:101/64"}, + ipt: [2]utiliptables.Interface{utiliptablestest.NewFake(), utiliptablestest.NewIpv6Fake()}, + expected: resolveDualStackLocalDetectors(t)( + proxyutiliptables.NewDetectLocalByCIDR("10.0.0.0/14", utiliptablestest.NewFake()))( + proxyutiliptables.NewDetectLocalByCIDR("2002::1234:abcd:ffff:c0a8:101/64", utiliptablestest.NewIpv6Fake())), + errExpected: false, + }, + { + mode: proxyconfigapi.LocalModeClusterCIDR, + config: &proxyconfigapi.KubeProxyConfiguration{ClusterCIDR: "2002::1234:abcd:ffff:c0a8:101/64,10.0.0.0/14"}, + ipt: [2]utiliptables.Interface{utiliptablestest.NewFake(), utiliptablestest.NewIpv6Fake()}, + expected: resolveDualStackLocalDetectors(t)( + proxyutiliptables.NewDetectLocalByCIDR("10.0.0.0/14", utiliptablestest.NewFake()))( + proxyutiliptables.NewDetectLocalByCIDR("2002::1234:abcd:ffff:c0a8:101/64", utiliptablestest.NewIpv6Fake())), + errExpected: false, + }, + { + mode: proxyconfigapi.LocalModeClusterCIDR, + config: &proxyconfigapi.KubeProxyConfiguration{ClusterCIDR: "10.0.0.0/14"}, + ipt: [2]utiliptables.Interface{utiliptablestest.NewFake(), utiliptablestest.NewIpv6Fake()}, + expected: [2]proxyutiliptables.LocalTrafficDetector{ + resolveLocalDetector(t)(proxyutiliptables.NewDetectLocalByCIDR("10.0.0.0/14", utiliptablestest.NewFake())), + proxyutiliptables.NewNoOpLocalDetector()}, + errExpected: false, + }, + { + mode: proxyconfigapi.LocalModeClusterCIDR, + config: &proxyconfigapi.KubeProxyConfiguration{ClusterCIDR: "2002::1234:abcd:ffff:c0a8:101/64"}, + ipt: [2]utiliptables.Interface{utiliptablestest.NewFake(), utiliptablestest.NewIpv6Fake()}, + expected: [2]proxyutiliptables.LocalTrafficDetector{ + proxyutiliptables.NewNoOpLocalDetector(), + resolveLocalDetector(t)(proxyutiliptables.NewDetectLocalByCIDR("2002::1234:abcd:ffff:c0a8:101/64", utiliptablestest.NewIpv6Fake()))}, + errExpected: false, + }, + { + mode: proxyconfigapi.LocalModeClusterCIDR, + config: &proxyconfigapi.KubeProxyConfiguration{ClusterCIDR: ""}, + ipt: [2]utiliptables.Interface{utiliptablestest.NewFake(), utiliptablestest.NewIpv6Fake()}, + expected: [2]proxyutiliptables.LocalTrafficDetector{proxyutiliptables.NewNoOpLocalDetector(), proxyutiliptables.NewNoOpLocalDetector()}, + errExpected: false, + }, + { + mode: proxyconfigapi.LocalMode("abcd"), + config: &proxyconfigapi.KubeProxyConfiguration{ClusterCIDR: ""}, + ipt: [2]utiliptables.Interface{utiliptablestest.NewFake(), utiliptablestest.NewIpv6Fake()}, + expected: [2]proxyutiliptables.LocalTrafficDetector{proxyutiliptables.NewNoOpLocalDetector(), proxyutiliptables.NewNoOpLocalDetector()}, + errExpected: false, + }, + } + for i, c := range cases { + r, err := getDualStackLocalDetectorTuple(c.mode, c.config, c.ipt) + if c.errExpected { + if err == nil { + t.Errorf("Case[%d] expected error, but succeeded with %q", i, r) + } + continue + } + if err != nil { + t.Errorf("Case[%d] Error resolving detect-local: %v", i, err) + continue + } + if !reflect.DeepEqual(r, c.expected) { + t.Errorf("Case[%d] Unexpected detect-local implementation, expected: %q, got: %q", i, c.expected, r) + } + } +} + +func resolveLocalDetector(t *testing.T) func(proxyutiliptables.LocalTrafficDetector, error) proxyutiliptables.LocalTrafficDetector { + return func(localDetector proxyutiliptables.LocalTrafficDetector, err error) proxyutiliptables.LocalTrafficDetector { + t.Helper() + if err != nil { + t.Fatalf("Error resolving detect-local: %v", err) + } + return localDetector + } +} + +func resolveDualStackLocalDetectors(t *testing.T) func(localDetector proxyutiliptables.LocalTrafficDetector, err1 error) func(proxyutiliptables.LocalTrafficDetector, error) [2]proxyutiliptables.LocalTrafficDetector { + return func(localDetector proxyutiliptables.LocalTrafficDetector, err error) func(proxyutiliptables.LocalTrafficDetector, error) [2]proxyutiliptables.LocalTrafficDetector { + t.Helper() + if err != nil { + t.Fatalf("Error resolving dual stack detect-local: %v", err) + } + return func(otherLocalDetector proxyutiliptables.LocalTrafficDetector, err1 error) [2]proxyutiliptables.LocalTrafficDetector { + t.Helper() + if err1 != nil { + t.Fatalf("Error resolving dual stack detect-local: %v", err) + } + return [2]proxyutiliptables.LocalTrafficDetector{localDetector, otherLocalDetector} + } + } +} diff --git a/cmd/kube-proxy/app/server_test.go b/cmd/kube-proxy/app/server_test.go index 5887eabbb36..5674c3bee67 100644 --- a/cmd/kube-proxy/app/server_test.go +++ b/cmd/kube-proxy/app/server_test.go @@ -28,12 +28,13 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" utilpointer "k8s.io/utils/pointer" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/diff" componentbaseconfig "k8s.io/component-base/config" kubeproxyconfig "k8s.io/kubernetes/pkg/proxy/apis/config" "k8s.io/kubernetes/pkg/util/configz" @@ -148,6 +149,7 @@ mode: "%s" oomScoreAdj: 17 portRange: "2-7" udpIdleTimeout: 123ms +detectLocalMode: "ClusterCIDR" nodePortAddresses: - "10.20.30.40/16" - "fd00:1::0/64" @@ -288,6 +290,7 @@ nodePortAddresses: PortRange: "2-7", UDPIdleTimeout: metav1.Duration{Duration: 123 * time.Millisecond}, NodePortAddresses: []string{"10.20.30.40/16", "fd00:1::0/64"}, + DetectLocalMode: kubeproxyconfig.LocalModeClusterCIDR, } options := NewOptions() @@ -304,7 +307,7 @@ nodePortAddresses: assert.NoError(t, err, "unexpected error for %s: %v", tc.name, err) if !reflect.DeepEqual(expected, config) { - t.Fatalf("unexpected config for %s, diff = %s", tc.name, diff.ObjectDiff(config, expected)) + t.Fatalf("unexpected config for %s, diff = %s", tc.name, cmp.Diff(config, expected)) } } } @@ -474,6 +477,7 @@ mode: "" nodePortAddresses: null oomScoreAdj: -999 portRange: "" +detectLocalMode: "ClusterCIDR" udpIdleTimeout: 250ms`) if err != nil { return nil, "", fmt.Errorf("unexpected error when writing content to temp kube-proxy config file: %v", err) diff --git a/cmd/kubeadm/app/componentconfigs/kubeproxy_test.go b/cmd/kubeadm/app/componentconfigs/kubeproxy_test.go index f29a8d0c8e9..67f46c56a26 100644 --- a/cmd/kubeadm/app/componentconfigs/kubeproxy_test.go +++ b/cmd/kubeadm/app/componentconfigs/kubeproxy_test.go @@ -63,6 +63,7 @@ var kubeProxyMarshalCases = []struct { min: null tcpCloseWaitTimeout: null tcpEstablishedTimeout: null + detectLocalMode: "" enableProfiling: false healthzBindAddress: "" hostnameOverride: "" @@ -118,6 +119,7 @@ var kubeProxyMarshalCases = []struct { min: null tcpCloseWaitTimeout: null tcpEstablishedTimeout: null + detectLocalMode: "" enableProfiling: true healthzBindAddress: "" hostnameOverride: "" diff --git a/pkg/kubemark/BUILD b/pkg/kubemark/BUILD index eee6b31f2bd..f58b5af4b7b 100644 --- a/pkg/kubemark/BUILD +++ b/pkg/kubemark/BUILD @@ -26,6 +26,7 @@ go_library( "//pkg/proxy:go_default_library", "//pkg/proxy/config:go_default_library", "//pkg/proxy/iptables:go_default_library", + "//pkg/proxy/util/iptables:go_default_library", "//pkg/util/iptables:go_default_library", "//pkg/util/node:go_default_library", "//pkg/util/oom:go_default_library", diff --git a/pkg/kubemark/hollow_proxy.go b/pkg/kubemark/hollow_proxy.go index 448a86bb9d5..2ee81999a27 100644 --- a/pkg/kubemark/hollow_proxy.go +++ b/pkg/kubemark/hollow_proxy.go @@ -30,6 +30,7 @@ import ( "k8s.io/kubernetes/pkg/proxy" proxyconfig "k8s.io/kubernetes/pkg/proxy/config" "k8s.io/kubernetes/pkg/proxy/iptables" + proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables" utiliptables "k8s.io/kubernetes/pkg/util/iptables" utilnode "k8s.io/kubernetes/pkg/util/node" utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" @@ -94,7 +95,7 @@ func NewHollowProxyOrDie( proxierMinSyncPeriod, false, 0, - "10.0.0.0/8", + proxyutiliptables.NewNoOpLocalDetector(), nodeName, nodeIP, recorder, diff --git a/pkg/proxy/apis/config/scheme/testdata/KubeProxyConfiguration/after/v1alpha1.yaml b/pkg/proxy/apis/config/scheme/testdata/KubeProxyConfiguration/after/v1alpha1.yaml index 0cbcfe066d4..92f46469c64 100644 --- a/pkg/proxy/apis/config/scheme/testdata/KubeProxyConfiguration/after/v1alpha1.yaml +++ b/pkg/proxy/apis/config/scheme/testdata/KubeProxyConfiguration/after/v1alpha1.yaml @@ -13,6 +13,7 @@ conntrack: min: 131072 tcpCloseWaitTimeout: 1h0m0s tcpEstablishedTimeout: 24h0m0s +detectLocalMode: "" enableProfiling: false healthzBindAddress: 0.0.0.0:10256 hostnameOverride: "" diff --git a/pkg/proxy/apis/config/scheme/testdata/KubeProxyConfiguration/roundtrip/default/v1alpha1.yaml b/pkg/proxy/apis/config/scheme/testdata/KubeProxyConfiguration/roundtrip/default/v1alpha1.yaml index 0cbcfe066d4..92f46469c64 100644 --- a/pkg/proxy/apis/config/scheme/testdata/KubeProxyConfiguration/roundtrip/default/v1alpha1.yaml +++ b/pkg/proxy/apis/config/scheme/testdata/KubeProxyConfiguration/roundtrip/default/v1alpha1.yaml @@ -13,6 +13,7 @@ conntrack: min: 131072 tcpCloseWaitTimeout: 1h0m0s tcpEstablishedTimeout: 24h0m0s +detectLocalMode: "" enableProfiling: false healthzBindAddress: 0.0.0.0:10256 hostnameOverride: "" diff --git a/pkg/proxy/apis/config/types.go b/pkg/proxy/apis/config/types.go index 7303b6e239f..a5c578a71ce 100644 --- a/pkg/proxy/apis/config/types.go +++ b/pkg/proxy/apis/config/types.go @@ -164,6 +164,8 @@ type KubeProxyConfiguration struct { Winkernel KubeProxyWinkernelConfiguration // ShowHiddenMetricsForVersion is the version for which you want to show hidden metrics. ShowHiddenMetricsForVersion string + // DetectLocalMode determines mode to use for detecting local traffic, defaults to LocalModeClusterCIDR + DetectLocalMode LocalMode } // Currently, three modes of proxy are available in Linux platform: 'userspace' (older, going to be EOL), 'iptables' @@ -188,6 +190,14 @@ const ( ProxyModeKernelspace ProxyMode = "kernelspace" ) +// LocalMode represents modes to detect local traffic from the node +type LocalMode string + +// Currently supported modes for LocalMode +const ( + LocalModeClusterCIDR LocalMode = "ClusterCIDR" +) + // IPVSSchedulerMethod is the algorithm for allocating TCP connections and // UDP datagrams to real servers. Scheduling algorithms are imple- //wanted as kernel modules. Ten are shipped with the Linux Virtual Server. @@ -246,6 +256,22 @@ func (m *ProxyMode) Type() string { return "ProxyMode" } +func (m *LocalMode) Set(s string) error { + *m = LocalMode(s) + return nil +} + +func (m *LocalMode) String() string { + if m != nil { + return string(*m) + } + return "" +} + +func (m *LocalMode) Type() string { + return "LocalMode" +} + type ConfigurationMap map[string]string func (m *ConfigurationMap) String() string { diff --git a/pkg/proxy/apis/config/v1alpha1/zz_generated.conversion.go b/pkg/proxy/apis/config/v1alpha1/zz_generated.conversion.go index 73a95d249f0..55e035ce0b1 100644 --- a/pkg/proxy/apis/config/v1alpha1/zz_generated.conversion.go +++ b/pkg/proxy/apis/config/v1alpha1/zz_generated.conversion.go @@ -121,6 +121,7 @@ func autoConvert_v1alpha1_KubeProxyConfiguration_To_config_KubeProxyConfiguratio return err } out.ShowHiddenMetricsForVersion = in.ShowHiddenMetricsForVersion + out.DetectLocalMode = config.LocalMode(in.DetectLocalMode) return nil } @@ -159,6 +160,7 @@ func autoConvert_config_KubeProxyConfiguration_To_v1alpha1_KubeProxyConfiguratio return err } out.ShowHiddenMetricsForVersion = in.ShowHiddenMetricsForVersion + out.DetectLocalMode = v1alpha1.LocalMode(in.DetectLocalMode) return nil } diff --git a/pkg/proxy/iptables/BUILD b/pkg/proxy/iptables/BUILD index cac0ab4c220..24f842e3d2d 100644 --- a/pkg/proxy/iptables/BUILD +++ b/pkg/proxy/iptables/BUILD @@ -17,6 +17,7 @@ go_library( "//pkg/proxy/metaproxier:go_default_library", "//pkg/proxy/metrics:go_default_library", "//pkg/proxy/util:go_default_library", + "//pkg/proxy/util/iptables:go_default_library", "//pkg/util/async:go_default_library", "//pkg/util/conntrack:go_default_library", "//pkg/util/iptables:go_default_library", @@ -41,6 +42,7 @@ go_test( "//pkg/proxy:go_default_library", "//pkg/proxy/healthcheck:go_default_library", "//pkg/proxy/util:go_default_library", + "//pkg/proxy/util/iptables:go_default_library", "//pkg/proxy/util/testing:go_default_library", "//pkg/util/async:go_default_library", "//pkg/util/conntrack:go_default_library", diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index f395200ea7f..e223f0f9e8f 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -33,7 +33,7 @@ import ( "sync/atomic" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1beta1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" @@ -46,6 +46,7 @@ import ( "k8s.io/kubernetes/pkg/proxy/metaproxier" "k8s.io/kubernetes/pkg/proxy/metrics" utilproxy "k8s.io/kubernetes/pkg/proxy/util" + proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables" "k8s.io/kubernetes/pkg/util/async" "k8s.io/kubernetes/pkg/util/conntrack" utiliptables "k8s.io/kubernetes/pkg/util/iptables" @@ -201,7 +202,7 @@ type Proxier struct { masqueradeAll bool masqueradeMark string exec utilexec.Interface - clusterCIDR string + localDetector proxyutiliptables.LocalTrafficDetector hostname string nodeIP net.IP portMapper utilproxy.PortOpener @@ -260,7 +261,7 @@ func NewProxier(ipt utiliptables.Interface, minSyncPeriod time.Duration, masqueradeAll bool, masqueradeBit int, - clusterCIDR string, + localDetector proxyutiliptables.LocalTrafficDetector, hostname string, nodeIP net.IP, recorder record.EventRecorder, @@ -285,12 +286,6 @@ func NewProxier(ipt utiliptables.Interface, masqueradeValue := 1 << uint(masqueradeBit) masqueradeMark := fmt.Sprintf("%#08x/%#08x", masqueradeValue, masqueradeValue) - if len(clusterCIDR) == 0 { - klog.Warning("clusterCIDR not specified, unable to distinguish between internal and external traffic") - } else if utilnet.IsIPv6CIDRString(clusterCIDR) != ipt.IsIpv6() { - return nil, fmt.Errorf("clusterCIDR %s has incorrect IP version: expect isIPv6=%t", clusterCIDR, ipt.IsIpv6()) - } - endpointSlicesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying) serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder) @@ -307,7 +302,7 @@ func NewProxier(ipt utiliptables.Interface, masqueradeAll: masqueradeAll, masqueradeMark: masqueradeMark, exec: exec, - clusterCIDR: clusterCIDR, + localDetector: localDetector, hostname: hostname, nodeIP: nodeIP, portMapper: &listenPortOpener{}, @@ -345,7 +340,7 @@ func NewDualStackProxier( minSyncPeriod time.Duration, masqueradeAll bool, masqueradeBit int, - clusterCIDR [2]string, + localDetectors [2]proxyutiliptables.LocalTrafficDetector, hostname string, nodeIP [2]net.IP, recorder record.EventRecorder, @@ -354,17 +349,15 @@ func NewDualStackProxier( ) (proxy.Provider, error) { // Create an ipv4 instance of the single-stack proxier ipv4Proxier, err := NewProxier(ipt[0], sysctl, - exec, syncPeriod, minSyncPeriod, - masqueradeAll, masqueradeBit, clusterCIDR[0], hostname, nodeIP[0], - recorder, healthzServer, nodePortAddresses) + exec, syncPeriod, minSyncPeriod, masqueradeAll, masqueradeBit, localDetectors[0], hostname, + nodeIP[0], recorder, healthzServer, nodePortAddresses) if err != nil { return nil, fmt.Errorf("unable to create ipv4 proxier: %v", err) } ipv6Proxier, err := NewProxier(ipt[1], sysctl, - exec, syncPeriod, minSyncPeriod, - masqueradeAll, masqueradeBit, clusterCIDR[1], hostname, nodeIP[1], - recorder, healthzServer, nodePortAddresses) + exec, syncPeriod, minSyncPeriod, masqueradeAll, masqueradeBit, localDetectors[1], hostname, + nodeIP[1], recorder, healthzServer, nodePortAddresses) if err != nil { return nil, fmt.Errorf("unable to create ipv6 proxier: %v", err) } @@ -1014,13 +1007,13 @@ func (proxier *Proxier) syncProxyRules() { ) if proxier.masqueradeAll { writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) - } else if len(proxier.clusterCIDR) > 0 { + } else if proxier.localDetector.IsImplemented() { // This masquerades off-cluster traffic to a service VIP. The idea // is that you can establish a static route for your Service range, // routing to any node, and that node will bridge into the Service // for you. Since that might bounce off-node, we masquerade here. // If/when we support "Local" policy for VIPs, we should update this. - writeLine(proxier.natRules, append(args, "! -s", proxier.clusterCIDR, "-j", string(KubeMarkMasqChain))...) + writeLine(proxier.natRules, proxier.localDetector.JumpIfNotLocal(args, string(KubeMarkMasqChain))...) } writeLine(proxier.natRules, append(args, "-j", string(svcChain))...) } else { @@ -1374,16 +1367,14 @@ func (proxier *Proxier) syncProxyRules() { // First rule in the chain redirects all pod -> external VIP traffic to the // Service's ClusterIP instead. This happens whether or not we have local - // endpoints; only if clusterCIDR is specified - if len(proxier.clusterCIDR) > 0 { + // endpoints; only if localDetector is implemented + if proxier.localDetector.IsImplemented() { args = append(args[:0], "-A", string(svcXlbChain), "-m", "comment", "--comment", `"Redirect pods trying to reach external loadbalancer VIP to clusterIP"`, - "-s", proxier.clusterCIDR, - "-j", string(svcChain), ) - writeLine(proxier.natRules, args...) + writeLine(proxier.natRules, proxier.localDetector.JumpIfLocal(args, string(svcChain))...) } // Next, redirect all src-type=LOCAL -> LB IP to the service chain for externalTrafficPolicy=Local @@ -1513,29 +1504,23 @@ func (proxier *Proxier) syncProxyRules() { "-j", "ACCEPT", ) - // The following rules can only be set if clusterCIDR has been defined. - if len(proxier.clusterCIDR) != 0 { - // The following two rules ensure the traffic after the initial packet - // accepted by the "kubernetes forwarding rules" rule above will be - // accepted, to be as specific as possible the traffic must be sourced - // or destined to the clusterCIDR (to/from a pod). - writeLine(proxier.filterRules, - "-A", string(kubeForwardChain), - "-s", proxier.clusterCIDR, - "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod source rule"`, - "-m", "conntrack", - "--ctstate", "RELATED,ESTABLISHED", - "-j", "ACCEPT", - ) - writeLine(proxier.filterRules, - "-A", string(kubeForwardChain), - "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod destination rule"`, - "-d", proxier.clusterCIDR, - "-m", "conntrack", - "--ctstate", "RELATED,ESTABLISHED", - "-j", "ACCEPT", - ) - } + // The following two rules ensure the traffic after the initial packet + // accepted by the "kubernetes forwarding rules" rule above will be + // accepted. + writeLine(proxier.filterRules, + "-A", string(kubeForwardChain), + "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod source rule"`, + "-m", "conntrack", + "--ctstate", "RELATED,ESTABLISHED", + "-j", "ACCEPT", + ) + writeLine(proxier.filterRules, + "-A", string(kubeForwardChain), + "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod destination rule"`, + "-m", "conntrack", + "--ctstate", "RELATED,ESTABLISHED", + "-j", "ACCEPT", + ) // Write the end-of-table markers. writeLine(proxier.filterRules, "COMMIT") diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 8528fc8e2de..f1fc7af1418 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -37,6 +37,7 @@ import ( "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy/healthcheck" utilproxy "k8s.io/kubernetes/pkg/proxy/util" + proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables" utilproxytest "k8s.io/kubernetes/pkg/proxy/util/testing" "k8s.io/kubernetes/pkg/util/async" "k8s.io/kubernetes/pkg/util/conntrack" @@ -345,6 +346,7 @@ const testHostname = "test-hostname" func NewFakeProxier(ipt utiliptables.Interface, endpointSlicesEnabled bool) *Proxier { // TODO: Call NewProxier after refactoring out the goroutine // invocation into a Run() method. + detectLocal, _ := proxyutiliptables.NewDetectLocalByCIDR("10.0.0.0/24", ipt) p := &Proxier{ exec: &fakeexec.FakeExec{}, serviceMap: make(proxy.ServiceMap), @@ -352,7 +354,7 @@ func NewFakeProxier(ipt utiliptables.Interface, endpointSlicesEnabled bool) *Pro endpointsMap: make(proxy.EndpointsMap), endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, newEndpointInfo, nil, nil, endpointSlicesEnabled), iptables: ipt, - clusterCIDR: "10.0.0.0/24", + localDetector: detectLocal, hostname: testHostname, portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), portMapper: &fakePortOpener{[]*utilproxy.LocalPort{}}, @@ -958,8 +960,6 @@ func TestOnlyLocalLoadBalancing(t *testing.T) { func TestOnlyLocalNodePortsNoClusterCIDR(t *testing.T) { ipt := iptablestest.NewFake() fp := NewFakeProxier(ipt, false) - // set cluster CIDR to empty before test - fp.clusterCIDR = "" onlyLocalNodePorts(t, fp, ipt) } @@ -2342,8 +2342,8 @@ func TestEndpointSliceE2E(t *testing.T) { :KUBE-FORWARD - [0:0] -A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP -A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark -j ACCEPT --A KUBE-FORWARD -s 10.0.0.0/24 -m comment --comment "kubernetes forwarding conntrack pod source rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT --A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod destination rule" -d 10.0.0.0/24 -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT +-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod source rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT +-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod destination rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT COMMIT *nat :KUBE-SERVICES - [0:0] diff --git a/pkg/proxy/ipvs/BUILD b/pkg/proxy/ipvs/BUILD index 7b276fdcdea..67c6d4c1597 100644 --- a/pkg/proxy/ipvs/BUILD +++ b/pkg/proxy/ipvs/BUILD @@ -19,6 +19,7 @@ go_test( "//pkg/proxy/healthcheck:go_default_library", "//pkg/proxy/ipvs/testing:go_default_library", "//pkg/proxy/util:go_default_library", + "//pkg/proxy/util/iptables:go_default_library", "//pkg/proxy/util/testing:go_default_library", "//pkg/util/async:go_default_library", "//pkg/util/ipset:go_default_library", @@ -59,6 +60,7 @@ go_library( "//pkg/proxy/metaproxier:go_default_library", "//pkg/proxy/metrics:go_default_library", "//pkg/proxy/util:go_default_library", + "//pkg/proxy/util/iptables:go_default_library", "//pkg/util/async:go_default_library", "//pkg/util/conntrack:go_default_library", "//pkg/util/ipset:go_default_library", diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 31823697355..5b3eda42960 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -49,6 +49,7 @@ import ( "k8s.io/kubernetes/pkg/proxy/metaproxier" "k8s.io/kubernetes/pkg/proxy/metrics" utilproxy "k8s.io/kubernetes/pkg/proxy/util" + proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables" "k8s.io/kubernetes/pkg/util/async" "k8s.io/kubernetes/pkg/util/conntrack" utilipset "k8s.io/kubernetes/pkg/util/ipset" @@ -226,7 +227,7 @@ type Proxier struct { exec utilexec.Interface masqueradeAll bool masqueradeMark string - clusterCIDR string + localDetector proxyutiliptables.LocalTrafficDetector hostname string nodeIP net.IP portMapper utilproxy.PortOpener @@ -333,7 +334,7 @@ func NewProxier(ipt utiliptables.Interface, udpTimeout time.Duration, masqueradeAll bool, masqueradeBit int, - clusterCIDR string, + localDetector proxyutiliptables.LocalTrafficDetector, hostname string, nodeIP net.IP, recorder record.EventRecorder, @@ -423,12 +424,6 @@ func NewProxier(ipt utiliptables.Interface, klog.V(2).Infof("nodeIP: %v, isIPv6: %v", nodeIP, isIPv6) - if len(clusterCIDR) == 0 { - klog.Warningf("clusterCIDR not specified, unable to distinguish between internal and external traffic") - } else if utilnet.IsIPv6CIDRString(clusterCIDR) != isIPv6 { - return nil, fmt.Errorf("clusterCIDR %s has incorrect IP version: expect isIPv6=%t", clusterCIDR, isIPv6) - } - if len(scheduler) == 0 { klog.Warningf("IPVS scheduler not specified, use %s by default", DefaultScheduler) scheduler = DefaultScheduler @@ -451,7 +446,7 @@ func NewProxier(ipt utiliptables.Interface, masqueradeAll: masqueradeAll, masqueradeMark: masqueradeMark, exec: exec, - clusterCIDR: clusterCIDR, + localDetector: localDetector, hostname: hostname, nodeIP: nodeIP, portMapper: &listenPortOpener{}, @@ -501,7 +496,7 @@ func NewDualStackProxier( udpTimeout time.Duration, masqueradeAll bool, masqueradeBit int, - clusterCIDR [2]string, + localDetectors [2]proxyutiliptables.LocalTrafficDetector, hostname string, nodeIP [2]net.IP, recorder record.EventRecorder, @@ -516,7 +511,7 @@ func NewDualStackProxier( ipv4Proxier, err := NewProxier(ipt[0], ipvs, safeIpset, sysctl, exec, syncPeriod, minSyncPeriod, filterCIDRs(false, excludeCIDRs), strictARP, tcpTimeout, tcpFinTimeout, udpTimeout, masqueradeAll, masqueradeBit, - clusterCIDR[0], hostname, nodeIP[0], + localDetectors[0], hostname, nodeIP[0], recorder, healthzServer, scheduler, nodePortAddresses) if err != nil { return nil, fmt.Errorf("unable to create ipv4 proxier: %v", err) @@ -525,7 +520,7 @@ func NewDualStackProxier( ipv6Proxier, err := NewProxier(ipt[1], ipvs, safeIpset, sysctl, exec, syncPeriod, minSyncPeriod, filterCIDRs(true, excludeCIDRs), strictARP, tcpTimeout, tcpFinTimeout, udpTimeout, masqueradeAll, masqueradeBit, - clusterCIDR[1], hostname, nodeIP[1], + localDetectors[1], hostname, nodeIP[1], nil, nil, scheduler, nodePortAddresses) if err != nil { return nil, fmt.Errorf("unable to create ipv6 proxier: %v", err) @@ -1654,13 +1649,13 @@ func (proxier *Proxier) writeIptablesRules() { ) if proxier.masqueradeAll { writeLine(proxier.natRules, append(args, "dst,dst", "-j", string(KubeMarkMasqChain))...) - } else if len(proxier.clusterCIDR) > 0 { + } else if proxier.localDetector.IsImplemented() { // This masquerades off-cluster traffic to a service VIP. The idea // is that you can establish a static route for your Service range, // routing to any node, and that node will bridge into the Service // for you. Since that might bounce off-node, we masquerade here. // If/when we support "Local" policy for VIPs, we should update this. - writeLine(proxier.natRules, append(args, "dst,dst", "! -s", proxier.clusterCIDR, "-j", string(KubeMarkMasqChain))...) + writeLine(proxier.natRules, proxier.localDetector.JumpIfNotLocal(append(args, "dst,dst"), string(KubeMarkMasqChain))...) } else { // Masquerade all OUTPUT traffic coming from a service ip. // The kube dummy interface has all service VIPs assigned which @@ -1730,29 +1725,23 @@ func (proxier *Proxier) writeIptablesRules() { "-j", "ACCEPT", ) - // The following rules can only be set if clusterCIDR has been defined. - if len(proxier.clusterCIDR) != 0 { - // The following two rules ensure the traffic after the initial packet - // accepted by the "kubernetes forwarding rules" rule above will be - // accepted, to be as specific as possible the traffic must be sourced - // or destined to the clusterCIDR (to/from a pod). - writeLine(proxier.filterRules, - "-A", string(KubeForwardChain), - "-s", proxier.clusterCIDR, - "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod source rule"`, - "-m", "conntrack", - "--ctstate", "RELATED,ESTABLISHED", - "-j", "ACCEPT", - ) - writeLine(proxier.filterRules, - "-A", string(KubeForwardChain), - "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod destination rule"`, - "-d", proxier.clusterCIDR, - "-m", "conntrack", - "--ctstate", "RELATED,ESTABLISHED", - "-j", "ACCEPT", - ) - } + // The following two rules ensure the traffic after the initial packet + // accepted by the "kubernetes forwarding rules" rule above will be + // accepted. + writeLine(proxier.filterRules, + "-A", string(KubeForwardChain), + "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod source rule"`, + "-m", "conntrack", + "--ctstate", "RELATED,ESTABLISHED", + "-j", "ACCEPT", + ) + writeLine(proxier.filterRules, + "-A", string(KubeForwardChain), + "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod destination rule"`, + "-m", "conntrack", + "--ctstate", "RELATED,ESTABLISHED", + "-j", "ACCEPT", + ) // Write the end-of-table markers. writeLine(proxier.filterRules, "COMMIT") diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index e5d4175af97..8e3f36bf751 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -37,6 +37,7 @@ import ( "k8s.io/kubernetes/pkg/proxy/healthcheck" netlinktest "k8s.io/kubernetes/pkg/proxy/ipvs/testing" utilproxy "k8s.io/kubernetes/pkg/proxy/util" + proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables" proxyutiltest "k8s.io/kubernetes/pkg/proxy/util/testing" "k8s.io/kubernetes/pkg/util/async" utilipset "k8s.io/kubernetes/pkg/util/ipset" @@ -125,8 +126,8 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u iptables: ipt, ipvs: ipvs, ipset: ipset, - clusterCIDR: "10.0.0.0/24", strictARP: false, + localDetector: proxyutiliptables.NewNoOpLocalDetector(), hostname: testHostname, portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), portMapper: &fakePortOpener{[]*utilproxy.LocalPort{}}, diff --git a/pkg/proxy/util/BUILD b/pkg/proxy/util/BUILD index c259569132f..8f17dea73f8 100644 --- a/pkg/proxy/util/BUILD +++ b/pkg/proxy/util/BUILD @@ -50,6 +50,7 @@ filegroup( name = "all-srcs", srcs = [ ":package-srcs", + "//pkg/proxy/util/iptables:all-srcs", "//pkg/proxy/util/testing:all-srcs", ], tags = ["automanaged"], diff --git a/pkg/proxy/util/iptables/BUILD b/pkg/proxy/util/iptables/BUILD new file mode 100644 index 00000000000..bca5202699a --- /dev/null +++ b/pkg/proxy/util/iptables/BUILD @@ -0,0 +1,37 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = ["traffic.go"], + importpath = "k8s.io/kubernetes/pkg/proxy/util/iptables", + visibility = ["//visibility:public"], + deps = [ + "//pkg/util/iptables:go_default_library", + "//vendor/k8s.io/klog:go_default_library", + "//vendor/k8s.io/utils/net:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) + +go_test( + name = "go_default_test", + srcs = ["traffic_test.go"], + embed = [":go_default_library"], + deps = [ + "//pkg/util/iptables:go_default_library", + "//pkg/util/iptables/testing:go_default_library", + ], +) diff --git a/pkg/proxy/util/iptables/traffic.go b/pkg/proxy/util/iptables/traffic.go new file mode 100644 index 00000000000..1ec572921fe --- /dev/null +++ b/pkg/proxy/util/iptables/traffic.go @@ -0,0 +1,93 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package iptables + +import ( + "fmt" + "net" + + "k8s.io/klog" + utiliptables "k8s.io/kubernetes/pkg/util/iptables" + utilnet "k8s.io/utils/net" +) + +// LocalTrafficDetector in a interface to take action (jump) based on whether traffic originated locally +// at the node or not +type LocalTrafficDetector interface { + // IsImplemented returns true if the implementation does something, false otherwise + IsImplemented() bool + + // JumpIfLocal appends conditions to jump to a target chain if traffic detected to be + // of local origin + JumpIfLocal(args []string, toChain string) []string + + // JumpINotfLocal appends conditions to jump to a target chain if traffic detected not to be + // of local origin + JumpIfNotLocal(args []string, toChain string) []string +} + +type noOpLocalDetector struct{} + +// NewNoOpLocalDetector is a no-op implementation of LocalTrafficDetector +func NewNoOpLocalDetector() LocalTrafficDetector { + return &noOpLocalDetector{} +} + +func (n *noOpLocalDetector) IsImplemented() bool { + return false +} + +func (n *noOpLocalDetector) JumpIfLocal(args []string, toChain string) []string { + return args // no-op +} + +func (n *noOpLocalDetector) JumpIfNotLocal(args []string, toChain string) []string { + return args // no-op +} + +type detectLocalByCIDR struct { + cidr string +} + +// NewDetectLocalByCIDR implements the LocalTrafficDetector interface using a CIDR. This can be used when a single CIDR +// range can be used to capture the notion of local traffic. +func NewDetectLocalByCIDR(cidr string, ipt utiliptables.Interface) (LocalTrafficDetector, error) { + if utilnet.IsIPv6CIDRString(cidr) != ipt.IsIpv6() { + return nil, fmt.Errorf("CIDR %s has incorrect IP version: expect isIPv6=%t", cidr, ipt.IsIpv6()) + } + _, _, err := net.ParseCIDR(cidr) + if err != nil { + return nil, err + } + return &detectLocalByCIDR{cidr: cidr}, nil +} + +func (d *detectLocalByCIDR) IsImplemented() bool { + return true +} + +func (d *detectLocalByCIDR) JumpIfLocal(args []string, toChain string) []string { + line := append(args, "-s", d.cidr, "-j", toChain) + klog.V(4).Info("[DetectLocalByCIDR (", d.cidr, ")", " Jump Local: ", line) + return line +} + +func (d *detectLocalByCIDR) JumpIfNotLocal(args []string, toChain string) []string { + line := append(args, "!", "-s", d.cidr, "-j", toChain) + klog.V(4).Info("[DetectLocalByCIDR (", d.cidr, ")]", " Jump Not Local: ", line) + return line +} diff --git a/pkg/proxy/util/iptables/traffic_test.go b/pkg/proxy/util/iptables/traffic_test.go new file mode 100644 index 00000000000..1ecee9b200f --- /dev/null +++ b/pkg/proxy/util/iptables/traffic_test.go @@ -0,0 +1,168 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package iptables + +import ( + "reflect" + "testing" + + utiliptables "k8s.io/kubernetes/pkg/util/iptables" + iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing" +) + +func TestNoOpLocalDetector(t *testing.T) { + cases := []struct { + chain string + args []string + expectedJumpIfOutput []string + expectedJumpIfNotOutput []string + }{ + { + chain: "TEST", + args: []string{"arg1", "arg2"}, + expectedJumpIfOutput: []string{"arg1", "arg2"}, + expectedJumpIfNotOutput: []string{"arg1", "arg2"}, + }, + } + for _, c := range cases { + localDetector := NewNoOpLocalDetector() + if localDetector.IsImplemented() { + t.Error("DetectLocalByCIDR returns true for IsImplemented") + } + + jumpIf := localDetector.JumpIfLocal(c.args, c.chain) + jumpIfNot := localDetector.JumpIfNotLocal(c.args, c.chain) + + if !reflect.DeepEqual(jumpIf, c.expectedJumpIfOutput) { + t.Errorf("JumpIf, expected: '%v', but got: '%v'", c.expectedJumpIfOutput, jumpIf) + } + + if !reflect.DeepEqual(jumpIfNot, c.expectedJumpIfNotOutput) { + t.Errorf("JumpIfNot, expected: '%v', but got: '%v'", c.expectedJumpIfNotOutput, jumpIfNot) + } + } +} + +func TestNewDetectLocalByCIDR(t *testing.T) { + cases := []struct { + cidr string + ipt utiliptables.Interface + errExpected bool + }{ + { + cidr: "10.0.0.0/14", + ipt: iptablestest.NewFake(), + errExpected: false, + }, + { + cidr: "2002::1234:abcd:ffff:c0a8:101/64", + ipt: iptablestest.NewIpv6Fake(), + errExpected: false, + }, + { + cidr: "10.0.0.0/14", + ipt: iptablestest.NewIpv6Fake(), + errExpected: true, + }, + { + cidr: "2002::1234:abcd:ffff:c0a8:101/64", + ipt: iptablestest.NewFake(), + errExpected: true, + }, + { + cidr: "10.0.0.0", + ipt: iptablestest.NewFake(), + errExpected: true, + }, + { + cidr: "2002::1234:abcd:ffff:c0a8:101", + ipt: iptablestest.NewIpv6Fake(), + errExpected: true, + }, + { + cidr: "", + ipt: iptablestest.NewFake(), + errExpected: true, + }, + { + cidr: "", + ipt: iptablestest.NewIpv6Fake(), + errExpected: true, + }, + } + for i, c := range cases { + r, err := NewDetectLocalByCIDR(c.cidr, c.ipt) + if c.errExpected { + if err == nil { + t.Errorf("Case[%d] expected error, but succeeded with: %q", i, r) + } + continue + } + if err != nil { + t.Errorf("Case[%d] failed with error: %v", i, err) + } + } +} + +func TestDetectLocalByCIDR(t *testing.T) { + cases := []struct { + cidr string + ipt utiliptables.Interface + chain string + args []string + expectedJumpIfOutput []string + expectedJumpIfNotOutput []string + }{ + { + cidr: "10.0.0.0/14", + ipt: iptablestest.NewFake(), + chain: "TEST", + args: []string{"arg1", "arg2"}, + expectedJumpIfOutput: []string{"arg1", "arg2", "-s", "10.0.0.0/14", "-j", "TEST"}, + expectedJumpIfNotOutput: []string{"arg1", "arg2", "!", "-s", "10.0.0.0/14", "-j", "TEST"}, + }, + { + cidr: "2002::1234:abcd:ffff:c0a8:101/64", + ipt: iptablestest.NewIpv6Fake(), + chain: "TEST", + args: []string{"arg1", "arg2"}, + expectedJumpIfOutput: []string{"arg1", "arg2", "-s", "2002::1234:abcd:ffff:c0a8:101/64", "-j", "TEST"}, + expectedJumpIfNotOutput: []string{"arg1", "arg2", "!", "-s", "2002::1234:abcd:ffff:c0a8:101/64", "-j", "TEST"}, + }, + } + for _, c := range cases { + localDetector, err := NewDetectLocalByCIDR(c.cidr, c.ipt) + if err != nil { + t.Errorf("Error initializing localDetector: %v", err) + continue + } + if !localDetector.IsImplemented() { + t.Error("DetectLocalByCIDR returns false for IsImplemented") + } + + jumpIf := localDetector.JumpIfLocal(c.args, c.chain) + jumpIfNot := localDetector.JumpIfNotLocal(c.args, c.chain) + + if !reflect.DeepEqual(jumpIf, c.expectedJumpIfOutput) { + t.Errorf("JumpIf, expected: '%v', but got: '%v'", c.expectedJumpIfOutput, jumpIf) + } + + if !reflect.DeepEqual(jumpIfNot, c.expectedJumpIfNotOutput) { + t.Errorf("JumpIfNot, expected: '%v', but got: '%v'", c.expectedJumpIfNotOutput, jumpIfNot) + } + } +} diff --git a/pkg/util/iptables/testing/fake.go b/pkg/util/iptables/testing/fake.go index 8d365708ed8..199bbb36b9f 100644 --- a/pkg/util/iptables/testing/fake.go +++ b/pkg/util/iptables/testing/fake.go @@ -57,6 +57,7 @@ type Rule map[string]string type FakeIPTables struct { hasRandomFully bool Lines []byte + ipv6 bool } // NewFake returns a no-op iptables.Interface @@ -64,6 +65,11 @@ func NewFake() *FakeIPTables { return &FakeIPTables{} } +// NewIpv6Fake returns a no-op iptables.Interface with IsIPv6() == true +func NewIpv6Fake() *FakeIPTables { + return &FakeIPTables{ipv6: true} +} + // SetHasRandomFully is part of iptables.Interface func (f *FakeIPTables) SetHasRandomFully(can bool) *FakeIPTables { f.hasRandomFully = can @@ -96,8 +102,8 @@ func (*FakeIPTables) DeleteRule(table iptables.Table, chain iptables.Chain, args } // IsIpv6 is part of iptables.Interface -func (*FakeIPTables) IsIpv6() bool { - return false +func (f *FakeIPTables) IsIpv6() bool { + return f.ipv6 } // Save is part of iptables.Interface diff --git a/staging/src/k8s.io/kube-proxy/config/v1alpha1/types.go b/staging/src/k8s.io/kube-proxy/config/v1alpha1/types.go index fc31406dbf9..7b29591d966 100644 --- a/staging/src/k8s.io/kube-proxy/config/v1alpha1/types.go +++ b/staging/src/k8s.io/kube-proxy/config/v1alpha1/types.go @@ -160,6 +160,8 @@ type KubeProxyConfiguration struct { Winkernel KubeProxyWinkernelConfiguration `json:"winkernel"` // ShowHiddenMetricsForVersion is the version for which you want to show hidden metrics. ShowHiddenMetricsForVersion string `json:"showHiddenMetricsForVersion"` + // DetectLocalMode determines mode to use for detecting local traffic, defaults to LocalModeClusterCIDR + DetectLocalMode LocalMode `json:"detectLocalMode"` } // Currently, three modes of proxy are available in Linux platform: 'userspace' (older, going to be EOL), 'iptables' @@ -176,3 +178,6 @@ type KubeProxyConfiguration struct { // future). If winkernel proxy is selected, regardless of how, but the Windows kernel can't support this mode of proxy, // this always falls back to the userspace proxy. type ProxyMode string + +// LocalMode represents modes to detect local traffic from the node +type LocalMode string