diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 97b739d05ff..da3286011ef 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -574,6 +574,13 @@ const ( // medium: HugePages-1Gi HugePageStorageMediumSize featuregate.Feature = "HugePageStorageMediumSize" + // owner: @freehan + // GA: v1.18 + // + // Enable ExternalTrafficPolicy for Service ExternalIPs. + // This is for bug fix #69811 + ExternalPolicyForExternalIP featuregate.Feature = "ExternalPolicyForExternalIP" + // owner: @bswartz // alpha: v1.18 // @@ -668,6 +675,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS ImmutableEphemeralVolumes: {Default: false, PreRelease: featuregate.Alpha}, DefaultIngressClass: {Default: true, PreRelease: featuregate.Beta}, HugePageStorageMediumSize: {Default: false, PreRelease: featuregate.Alpha}, + ExternalPolicyForExternalIP: {Default: false, PreRelease: featuregate.GA}, // remove in 1.19 AnyVolumeDataSource: {Default: false, PreRelease: featuregate.Alpha}, // inherited features from generic apiserver, relisted here to get a conflict if it is changed diff --git a/pkg/proxy/iptables/BUILD b/pkg/proxy/iptables/BUILD index 24f842e3d2d..1773b637e91 100644 --- a/pkg/proxy/iptables/BUILD +++ b/pkg/proxy/iptables/BUILD @@ -39,6 +39,7 @@ go_test( srcs = ["proxier_test.go"], embed = [":go_default_library"], deps = [ + "//pkg/features:go_default_library", "//pkg/proxy:go_default_library", "//pkg/proxy/healthcheck:go_default_library", "//pkg/proxy/util:go_default_library", @@ -53,6 +54,8 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", + "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/utils/exec:go_default_library", diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index e223f0f9e8f..b7d4292dace 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -1070,8 +1070,13 @@ func (proxier *Proxier) syncProxyRules() { "-d", utilproxy.ToCIDR(net.ParseIP(externalIP)), "--dport", strconv.Itoa(svcInfo.Port()), ) - // We have to SNAT packets to external IPs. - writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) + + destChain := svcXlbChain + // We have to SNAT packets to external IPs if externalTrafficPolicy is cluster. + if !(utilfeature.DefaultFeatureGate.Enabled(features.ExternalPolicyForExternalIP) && svcInfo.OnlyNodeLocalEndpoints()) { + destChain = svcChain + writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) + } // Allow traffic for external IPs that does not come from a bridge (i.e. not from a container) // nor from a local process to be forwarded to the service. @@ -1080,11 +1085,11 @@ func (proxier *Proxier) syncProxyRules() { externalTrafficOnlyArgs := append(args, "-m", "physdev", "!", "--physdev-is-in", "-m", "addrtype", "!", "--src-type", "LOCAL") - writeLine(proxier.natRules, append(externalTrafficOnlyArgs, "-j", string(svcChain))...) + writeLine(proxier.natRules, append(externalTrafficOnlyArgs, "-j", string(destChain))...) dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL") // Allow traffic bound for external IPs that happen to be recognized as local IPs to stay local. // This covers cases like GCE load-balancers which get added to the local routing table. - writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", string(svcChain))...) + writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", string(destChain))...) } else { // No endpoints. writeLine(proxier.filterRules, diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index f1fc7af1418..ba414dc7350 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -34,6 +34,9 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy/healthcheck" utilproxy "k8s.io/kubernetes/pkg/proxy/util" @@ -832,6 +835,80 @@ func TestExternalIPsReject(t *testing.T) { } } +func TestOnlyLocalExternalIPs(t *testing.T) { + // TODO(freehan): remove this in k8s 1.19 + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ExternalPolicyForExternalIP, true)() + + ipt := iptablestest.NewFake() + fp := NewFakeProxier(ipt, false) + svcIP := "10.20.30.41" + svcPort := 80 + svcExternalIPs := "50.60.70.81" + svcPortName := proxy.ServicePortName{ + NamespacedName: makeNSN("ns1", "svc1"), + Port: "p80", + } + + makeServiceMap(fp, + makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) { + svc.Spec.Type = "NodePort" + svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal + svc.Spec.ClusterIP = svcIP + svc.Spec.ExternalIPs = []string{svcExternalIPs} + svc.Spec.Ports = []v1.ServicePort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: v1.ProtocolTCP, + TargetPort: intstr.FromInt(svcPort), + }} + }), + ) + makeEndpointsMap(fp) + epIP1 := "10.180.0.1" + epIP2 := "10.180.2.1" + epStrLocal := fmt.Sprintf("%s:%d", epIP1, svcPort) + epStrNonLocal := fmt.Sprintf("%s:%d", epIP2, svcPort) + makeEndpointsMap(fp, + makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) { + ept.Subsets = []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{ + IP: epIP1, + NodeName: nil, + }, { + IP: epIP2, + NodeName: utilpointer.StringPtr(testHostname), + }}, + Ports: []v1.EndpointPort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: v1.ProtocolTCP, + }}, + }} + }), + ) + + fp.syncProxyRules() + + proto := strings.ToLower(string(v1.ProtocolTCP)) + lbChain := string(serviceLBChainName(svcPortName.String(), proto)) + + nonLocalEpChain := string(servicePortEndpointChainName(svcPortName.String(), strings.ToLower(string(v1.ProtocolTCP)), epStrLocal)) + localEpChain := string(servicePortEndpointChainName(svcPortName.String(), strings.ToLower(string(v1.ProtocolTCP)), epStrNonLocal)) + + kubeSvcRules := ipt.GetRules(string(kubeServicesChain)) + if !hasJump(kubeSvcRules, lbChain, svcExternalIPs, svcPort) { + errorf(fmt.Sprintf("Failed to find jump to xlb chain %v", lbChain), kubeSvcRules, t) + } + + lbRules := ipt.GetRules(lbChain) + if hasJump(lbRules, nonLocalEpChain, "", 0) { + errorf(fmt.Sprintf("Found jump from lb chain %v to non-local ep %v", lbChain, epStrLocal), lbRules, t) + } + if !hasJump(lbRules, localEpChain, "", 0) { + errorf(fmt.Sprintf("Didn't find jump from lb chain %v to local ep %v", lbChain, epStrNonLocal), lbRules, t) + } +} + func TestNodePortReject(t *testing.T) { ipt := iptablestest.NewFake() fp := NewFakeProxier(ipt, false) diff --git a/pkg/proxy/ipvs/BUILD b/pkg/proxy/ipvs/BUILD index 67c6d4c1597..9da47dbd4f7 100644 --- a/pkg/proxy/ipvs/BUILD +++ b/pkg/proxy/ipvs/BUILD @@ -15,6 +15,7 @@ go_test( ], embed = [":go_default_library"], deps = [ + "//pkg/features:go_default_library", "//pkg/proxy:go_default_library", "//pkg/proxy/healthcheck:go_default_library", "//pkg/proxy/ipvs/testing:go_default_library", @@ -34,6 +35,8 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", + "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/k8s.io/utils/exec:go_default_library", "//vendor/k8s.io/utils/exec/testing:go_default_library", diff --git a/pkg/proxy/ipvs/ipset.go b/pkg/proxy/ipvs/ipset.go index 419b1a03598..7b75ec879c2 100644 --- a/pkg/proxy/ipvs/ipset.go +++ b/pkg/proxy/ipvs/ipset.go @@ -40,6 +40,9 @@ const ( kubeExternalIPSetComment = "Kubernetes service external ip + port for masquerade and filter purpose" kubeExternalIPSet = "KUBE-EXTERNAL-IP" + kubeExternalIPLocalSetComment = "Kubernetes service external ip + port with externalTrafficPolicy=local" + kubeExternalIPLocalSet = "KUBE-EXTERNAL-IP-LOCAL" + kubeLoadBalancerSetComment = "Kubernetes service lb portal" kubeLoadBalancerSet = "KUBE-LOAD-BALANCER" diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 5b3eda42960..cbe0545d493 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -140,6 +140,7 @@ var ipsetInfo = []struct { {kubeLoopBackIPSet, utilipset.HashIPPortIP, kubeLoopBackIPSetComment}, {kubeClusterIPSet, utilipset.HashIPPort, kubeClusterIPSetComment}, {kubeExternalIPSet, utilipset.HashIPPort, kubeExternalIPSetComment}, + {kubeExternalIPLocalSet, utilipset.HashIPPort, kubeExternalIPLocalSetComment}, {kubeLoadBalancerSet, utilipset.HashIPPort, kubeLoadBalancerSetComment}, {kubeLoadbalancerFWSet, utilipset.HashIPPort, kubeLoadbalancerFWSetComment}, {kubeLoadBalancerLocalSet, utilipset.HashIPPort, kubeLoadBalancerLocalSetComment}, @@ -1236,12 +1237,21 @@ func (proxier *Proxier) syncProxyRules() { Protocol: protocol, SetType: utilipset.HashIPPort, } - // We have to SNAT packets to external IPs. - if valid := proxier.ipsetList[kubeExternalIPSet].validateEntry(entry); !valid { - klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeExternalIPSet].Name)) - continue + + if utilfeature.DefaultFeatureGate.Enabled(features.ExternalPolicyForExternalIP) && svcInfo.OnlyNodeLocalEndpoints() { + if valid := proxier.ipsetList[kubeExternalIPLocalSet].validateEntry(entry); !valid { + klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeExternalIPLocalSet].Name)) + continue + } + proxier.ipsetList[kubeExternalIPLocalSet].activeEntries.Insert(entry.String()) + } else { + // We have to SNAT packets to external IPs. + if valid := proxier.ipsetList[kubeExternalIPSet].validateEntry(entry); !valid { + klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeExternalIPSet].Name)) + continue + } + proxier.ipsetList[kubeExternalIPSet].activeEntries.Insert(entry.String()) } - proxier.ipsetList[kubeExternalIPSet].activeEntries.Insert(entry.String()) // ipvs call serv := &utilipvs.VirtualServer{ @@ -1257,7 +1267,12 @@ func (proxier *Proxier) syncProxyRules() { if err := proxier.syncService(svcNameString, serv, true); err == nil { activeIPVSServices[serv.String()] = true activeBindAddrs[serv.Address.String()] = true - if err := proxier.syncEndpoint(svcName, false, serv); err != nil { + + onlyNodeLocalEndpoints := false + if utilfeature.DefaultFeatureGate.Enabled(features.ExternalPolicyForExternalIP) { + onlyNodeLocalEndpoints = svcInfo.OnlyNodeLocalEndpoints() + } + if err := proxier.syncEndpoint(svcName, onlyNodeLocalEndpoints, serv); err != nil { klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err) } } else { @@ -1668,15 +1683,8 @@ func (proxier *Proxier) writeIptablesRules() { } } - if !proxier.ipsetList[kubeExternalIPSet].isEmpty() { - // Build masquerade rules for packets to external IPs. - args = append(args[:0], - "-A", string(kubeServicesChain), - "-m", "comment", "--comment", proxier.ipsetList[kubeExternalIPSet].getComment(), - "-m", "set", "--match-set", proxier.ipsetList[kubeExternalIPSet].Name, - "dst,dst", - ) - writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) + // externalIPRules adds iptables rules applies to Service ExternalIPs + externalIPRules := func(args []string) { // Allow traffic for external IPs that does not come from a bridge (i.e. not from a container) // nor from a local process to be forwarded to the service. // This rule roughly translates to "all traffic from off-machine". @@ -1691,6 +1699,28 @@ func (proxier *Proxier) writeIptablesRules() { writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", "ACCEPT")...) } + if !proxier.ipsetList[kubeExternalIPSet].isEmpty() { + // Build masquerade rules for packets to external IPs. + args = append(args[:0], + "-A", string(kubeServicesChain), + "-m", "comment", "--comment", proxier.ipsetList[kubeExternalIPSet].getComment(), + "-m", "set", "--match-set", proxier.ipsetList[kubeExternalIPSet].Name, + "dst,dst", + ) + writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) + externalIPRules(args) + } + + if !proxier.ipsetList[kubeExternalIPLocalSet].isEmpty() { + args = append(args[:0], + "-A", string(kubeServicesChain), + "-m", "comment", "--comment", proxier.ipsetList[kubeExternalIPLocalSet].getComment(), + "-m", "set", "--match-set", proxier.ipsetList[kubeExternalIPLocalSet].Name, + "dst,dst", + ) + externalIPRules(args) + } + // -A KUBE-SERVICES -m addrtype --dst-type LOCAL -j KUBE-NODE-PORT args = append(args[:0], "-A", string(kubeServicesChain), diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 8e3f36bf751..53a056ab6ad 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -33,6 +33,9 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy/healthcheck" netlinktest "k8s.io/kubernetes/pkg/proxy/ipvs/testing" @@ -1256,6 +1259,92 @@ func TestExternalIPs(t *testing.T) { } } +func TestOnlyLocalExternalIPs(t *testing.T) { + // TODO(freehan): remove this in k8s 1.19 + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ExternalPolicyForExternalIP, true)() + + ipt := iptablestest.NewFake() + ipvs := ipvstest.NewFake() + ipset := ipsettest.NewFake(testIPSetVersion) + fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, false) + svcIP := "10.20.30.41" + svcPort := 80 + svcExternalIPs := sets.NewString("50.60.70.81", "2012::51", "127.0.0.1") + svcPortName := proxy.ServicePortName{ + NamespacedName: makeNSN("ns1", "svc1"), + Port: "p80", + } + + makeServiceMap(fp, + makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) { + svc.Spec.Type = "NodePort" + svc.Spec.ClusterIP = svcIP + svc.Spec.ExternalIPs = svcExternalIPs.UnsortedList() + svc.Spec.Ports = []v1.ServicePort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: v1.ProtocolTCP, + TargetPort: intstr.FromInt(svcPort), + }} + svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal + }), + ) + epIP := "10.180.0.1" + epIP1 := "10.180.1.1" + thisHostname := testHostname + otherHostname := "other-hostname" + makeEndpointsMap(fp, + makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) { + ept.Subsets = []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{ + IP: epIP, + NodeName: utilpointer.StringPtr(thisHostname), + }, + { + IP: epIP1, + NodeName: utilpointer.StringPtr(otherHostname), + }, + }, + Ports: []v1.EndpointPort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: v1.ProtocolTCP, + }}, + }} + }), + ) + + fp.syncProxyRules() + + // check ipvs service and destinations + services, err := ipvs.GetVirtualServers() + if err != nil { + t.Errorf("Failed to get ipvs services, err: %v", err) + } + if len(services) != 4 { + t.Errorf("Expect 4 ipvs services, got %d", len(services)) + } + found := false + for _, svc := range services { + if svcExternalIPs.Has(svc.Address.String()) && svc.Port == uint16(svcPort) && svc.Protocol == string(v1.ProtocolTCP) { + found = true + destinations, _ := ipvs.GetRealServers(svc) + if len(destinations) != 1 { + t.Errorf("Expect only 1 local endpoint. but got %v", len(destinations)) + } + for _, dest := range destinations { + if dest.Address.String() != epIP || dest.Port != uint16(svcPort) { + t.Errorf("service Endpoint mismatch ipvs service destination") + } + } + break + } + } + if !found { + t.Errorf("Expect external ip type service, got none") + } +} + func TestLoadBalancer(t *testing.T) { ipt, fp := buildFakeProxier() svcIP := "10.20.30.41" @@ -1432,6 +1521,7 @@ func TestOnlyLocalNodePorts(t *testing.T) { } checkIptables(t, ipt, epIpt) } + func TestLoadBalanceSourceRanges(t *testing.T) { ipt, fp := buildFakeProxier()