mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 12:43:23 +00:00
Merge pull request #88786 from freehan/externalIP
Fix ExternalTrafficPolicy support for Service ExternalIPs
This commit is contained in:
commit
0d85d1629c
@ -574,6 +574,13 @@ const (
|
||||
// medium: HugePages-1Gi
|
||||
HugePageStorageMediumSize featuregate.Feature = "HugePageStorageMediumSize"
|
||||
|
||||
// owner: @freehan
|
||||
// GA: v1.18
|
||||
//
|
||||
// Enable ExternalTrafficPolicy for Service ExternalIPs.
|
||||
// This is for bug fix #69811
|
||||
ExternalPolicyForExternalIP featuregate.Feature = "ExternalPolicyForExternalIP"
|
||||
|
||||
// owner: @bswartz
|
||||
// alpha: v1.18
|
||||
//
|
||||
@ -668,6 +675,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
|
||||
ImmutableEphemeralVolumes: {Default: false, PreRelease: featuregate.Alpha},
|
||||
DefaultIngressClass: {Default: true, PreRelease: featuregate.Beta},
|
||||
HugePageStorageMediumSize: {Default: false, PreRelease: featuregate.Alpha},
|
||||
ExternalPolicyForExternalIP: {Default: false, PreRelease: featuregate.GA}, // remove in 1.19
|
||||
AnyVolumeDataSource: {Default: false, PreRelease: featuregate.Alpha},
|
||||
|
||||
// inherited features from generic apiserver, relisted here to get a conflict if it is changed
|
||||
|
@ -39,6 +39,7 @@ go_test(
|
||||
srcs = ["proxier_test.go"],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//pkg/features:go_default_library",
|
||||
"//pkg/proxy:go_default_library",
|
||||
"//pkg/proxy/healthcheck:go_default_library",
|
||||
"//pkg/proxy/util:go_default_library",
|
||||
@ -53,6 +54,8 @@ go_test(
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
|
||||
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
||||
"//vendor/k8s.io/klog:go_default_library",
|
||||
"//vendor/k8s.io/utils/exec:go_default_library",
|
||||
|
@ -1070,8 +1070,13 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
"-d", utilproxy.ToCIDR(net.ParseIP(externalIP)),
|
||||
"--dport", strconv.Itoa(svcInfo.Port()),
|
||||
)
|
||||
// We have to SNAT packets to external IPs.
|
||||
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
|
||||
|
||||
destChain := svcXlbChain
|
||||
// We have to SNAT packets to external IPs if externalTrafficPolicy is cluster.
|
||||
if !(utilfeature.DefaultFeatureGate.Enabled(features.ExternalPolicyForExternalIP) && svcInfo.OnlyNodeLocalEndpoints()) {
|
||||
destChain = svcChain
|
||||
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
|
||||
}
|
||||
|
||||
// Allow traffic for external IPs that does not come from a bridge (i.e. not from a container)
|
||||
// nor from a local process to be forwarded to the service.
|
||||
@ -1080,11 +1085,11 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
externalTrafficOnlyArgs := append(args,
|
||||
"-m", "physdev", "!", "--physdev-is-in",
|
||||
"-m", "addrtype", "!", "--src-type", "LOCAL")
|
||||
writeLine(proxier.natRules, append(externalTrafficOnlyArgs, "-j", string(svcChain))...)
|
||||
writeLine(proxier.natRules, append(externalTrafficOnlyArgs, "-j", string(destChain))...)
|
||||
dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL")
|
||||
// Allow traffic bound for external IPs that happen to be recognized as local IPs to stay local.
|
||||
// This covers cases like GCE load-balancers which get added to the local routing table.
|
||||
writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", string(svcChain))...)
|
||||
writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", string(destChain))...)
|
||||
} else {
|
||||
// No endpoints.
|
||||
writeLine(proxier.filterRules,
|
||||
|
@ -34,6 +34,9 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/proxy"
|
||||
"k8s.io/kubernetes/pkg/proxy/healthcheck"
|
||||
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
|
||||
@ -832,6 +835,80 @@ func TestExternalIPsReject(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestOnlyLocalExternalIPs(t *testing.T) {
|
||||
// TODO(freehan): remove this in k8s 1.19
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ExternalPolicyForExternalIP, true)()
|
||||
|
||||
ipt := iptablestest.NewFake()
|
||||
fp := NewFakeProxier(ipt, false)
|
||||
svcIP := "10.20.30.41"
|
||||
svcPort := 80
|
||||
svcExternalIPs := "50.60.70.81"
|
||||
svcPortName := proxy.ServicePortName{
|
||||
NamespacedName: makeNSN("ns1", "svc1"),
|
||||
Port: "p80",
|
||||
}
|
||||
|
||||
makeServiceMap(fp,
|
||||
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
|
||||
svc.Spec.Type = "NodePort"
|
||||
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
|
||||
svc.Spec.ClusterIP = svcIP
|
||||
svc.Spec.ExternalIPs = []string{svcExternalIPs}
|
||||
svc.Spec.Ports = []v1.ServicePort{{
|
||||
Name: svcPortName.Port,
|
||||
Port: int32(svcPort),
|
||||
Protocol: v1.ProtocolTCP,
|
||||
TargetPort: intstr.FromInt(svcPort),
|
||||
}}
|
||||
}),
|
||||
)
|
||||
makeEndpointsMap(fp)
|
||||
epIP1 := "10.180.0.1"
|
||||
epIP2 := "10.180.2.1"
|
||||
epStrLocal := fmt.Sprintf("%s:%d", epIP1, svcPort)
|
||||
epStrNonLocal := fmt.Sprintf("%s:%d", epIP2, svcPort)
|
||||
makeEndpointsMap(fp,
|
||||
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
|
||||
ept.Subsets = []v1.EndpointSubset{{
|
||||
Addresses: []v1.EndpointAddress{{
|
||||
IP: epIP1,
|
||||
NodeName: nil,
|
||||
}, {
|
||||
IP: epIP2,
|
||||
NodeName: utilpointer.StringPtr(testHostname),
|
||||
}},
|
||||
Ports: []v1.EndpointPort{{
|
||||
Name: svcPortName.Port,
|
||||
Port: int32(svcPort),
|
||||
Protocol: v1.ProtocolTCP,
|
||||
}},
|
||||
}}
|
||||
}),
|
||||
)
|
||||
|
||||
fp.syncProxyRules()
|
||||
|
||||
proto := strings.ToLower(string(v1.ProtocolTCP))
|
||||
lbChain := string(serviceLBChainName(svcPortName.String(), proto))
|
||||
|
||||
nonLocalEpChain := string(servicePortEndpointChainName(svcPortName.String(), strings.ToLower(string(v1.ProtocolTCP)), epStrLocal))
|
||||
localEpChain := string(servicePortEndpointChainName(svcPortName.String(), strings.ToLower(string(v1.ProtocolTCP)), epStrNonLocal))
|
||||
|
||||
kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
|
||||
if !hasJump(kubeSvcRules, lbChain, svcExternalIPs, svcPort) {
|
||||
errorf(fmt.Sprintf("Failed to find jump to xlb chain %v", lbChain), kubeSvcRules, t)
|
||||
}
|
||||
|
||||
lbRules := ipt.GetRules(lbChain)
|
||||
if hasJump(lbRules, nonLocalEpChain, "", 0) {
|
||||
errorf(fmt.Sprintf("Found jump from lb chain %v to non-local ep %v", lbChain, epStrLocal), lbRules, t)
|
||||
}
|
||||
if !hasJump(lbRules, localEpChain, "", 0) {
|
||||
errorf(fmt.Sprintf("Didn't find jump from lb chain %v to local ep %v", lbChain, epStrNonLocal), lbRules, t)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNodePortReject(t *testing.T) {
|
||||
ipt := iptablestest.NewFake()
|
||||
fp := NewFakeProxier(ipt, false)
|
||||
|
@ -15,6 +15,7 @@ go_test(
|
||||
],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//pkg/features:go_default_library",
|
||||
"//pkg/proxy:go_default_library",
|
||||
"//pkg/proxy/healthcheck:go_default_library",
|
||||
"//pkg/proxy/ipvs/testing:go_default_library",
|
||||
@ -34,6 +35,8 @@ go_test(
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
|
||||
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
||||
"//vendor/k8s.io/utils/exec:go_default_library",
|
||||
"//vendor/k8s.io/utils/exec/testing:go_default_library",
|
||||
|
@ -40,6 +40,9 @@ const (
|
||||
kubeExternalIPSetComment = "Kubernetes service external ip + port for masquerade and filter purpose"
|
||||
kubeExternalIPSet = "KUBE-EXTERNAL-IP"
|
||||
|
||||
kubeExternalIPLocalSetComment = "Kubernetes service external ip + port with externalTrafficPolicy=local"
|
||||
kubeExternalIPLocalSet = "KUBE-EXTERNAL-IP-LOCAL"
|
||||
|
||||
kubeLoadBalancerSetComment = "Kubernetes service lb portal"
|
||||
kubeLoadBalancerSet = "KUBE-LOAD-BALANCER"
|
||||
|
||||
|
@ -140,6 +140,7 @@ var ipsetInfo = []struct {
|
||||
{kubeLoopBackIPSet, utilipset.HashIPPortIP, kubeLoopBackIPSetComment},
|
||||
{kubeClusterIPSet, utilipset.HashIPPort, kubeClusterIPSetComment},
|
||||
{kubeExternalIPSet, utilipset.HashIPPort, kubeExternalIPSetComment},
|
||||
{kubeExternalIPLocalSet, utilipset.HashIPPort, kubeExternalIPLocalSetComment},
|
||||
{kubeLoadBalancerSet, utilipset.HashIPPort, kubeLoadBalancerSetComment},
|
||||
{kubeLoadbalancerFWSet, utilipset.HashIPPort, kubeLoadbalancerFWSetComment},
|
||||
{kubeLoadBalancerLocalSet, utilipset.HashIPPort, kubeLoadBalancerLocalSetComment},
|
||||
@ -1236,12 +1237,21 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
Protocol: protocol,
|
||||
SetType: utilipset.HashIPPort,
|
||||
}
|
||||
// We have to SNAT packets to external IPs.
|
||||
if valid := proxier.ipsetList[kubeExternalIPSet].validateEntry(entry); !valid {
|
||||
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeExternalIPSet].Name))
|
||||
continue
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.ExternalPolicyForExternalIP) && svcInfo.OnlyNodeLocalEndpoints() {
|
||||
if valid := proxier.ipsetList[kubeExternalIPLocalSet].validateEntry(entry); !valid {
|
||||
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeExternalIPLocalSet].Name))
|
||||
continue
|
||||
}
|
||||
proxier.ipsetList[kubeExternalIPLocalSet].activeEntries.Insert(entry.String())
|
||||
} else {
|
||||
// We have to SNAT packets to external IPs.
|
||||
if valid := proxier.ipsetList[kubeExternalIPSet].validateEntry(entry); !valid {
|
||||
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeExternalIPSet].Name))
|
||||
continue
|
||||
}
|
||||
proxier.ipsetList[kubeExternalIPSet].activeEntries.Insert(entry.String())
|
||||
}
|
||||
proxier.ipsetList[kubeExternalIPSet].activeEntries.Insert(entry.String())
|
||||
|
||||
// ipvs call
|
||||
serv := &utilipvs.VirtualServer{
|
||||
@ -1257,7 +1267,12 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
if err := proxier.syncService(svcNameString, serv, true); err == nil {
|
||||
activeIPVSServices[serv.String()] = true
|
||||
activeBindAddrs[serv.Address.String()] = true
|
||||
if err := proxier.syncEndpoint(svcName, false, serv); err != nil {
|
||||
|
||||
onlyNodeLocalEndpoints := false
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.ExternalPolicyForExternalIP) {
|
||||
onlyNodeLocalEndpoints = svcInfo.OnlyNodeLocalEndpoints()
|
||||
}
|
||||
if err := proxier.syncEndpoint(svcName, onlyNodeLocalEndpoints, serv); err != nil {
|
||||
klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
|
||||
}
|
||||
} else {
|
||||
@ -1668,15 +1683,8 @@ func (proxier *Proxier) writeIptablesRules() {
|
||||
}
|
||||
}
|
||||
|
||||
if !proxier.ipsetList[kubeExternalIPSet].isEmpty() {
|
||||
// Build masquerade rules for packets to external IPs.
|
||||
args = append(args[:0],
|
||||
"-A", string(kubeServicesChain),
|
||||
"-m", "comment", "--comment", proxier.ipsetList[kubeExternalIPSet].getComment(),
|
||||
"-m", "set", "--match-set", proxier.ipsetList[kubeExternalIPSet].Name,
|
||||
"dst,dst",
|
||||
)
|
||||
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
|
||||
// externalIPRules adds iptables rules applies to Service ExternalIPs
|
||||
externalIPRules := func(args []string) {
|
||||
// Allow traffic for external IPs that does not come from a bridge (i.e. not from a container)
|
||||
// nor from a local process to be forwarded to the service.
|
||||
// This rule roughly translates to "all traffic from off-machine".
|
||||
@ -1691,6 +1699,28 @@ func (proxier *Proxier) writeIptablesRules() {
|
||||
writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", "ACCEPT")...)
|
||||
}
|
||||
|
||||
if !proxier.ipsetList[kubeExternalIPSet].isEmpty() {
|
||||
// Build masquerade rules for packets to external IPs.
|
||||
args = append(args[:0],
|
||||
"-A", string(kubeServicesChain),
|
||||
"-m", "comment", "--comment", proxier.ipsetList[kubeExternalIPSet].getComment(),
|
||||
"-m", "set", "--match-set", proxier.ipsetList[kubeExternalIPSet].Name,
|
||||
"dst,dst",
|
||||
)
|
||||
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
|
||||
externalIPRules(args)
|
||||
}
|
||||
|
||||
if !proxier.ipsetList[kubeExternalIPLocalSet].isEmpty() {
|
||||
args = append(args[:0],
|
||||
"-A", string(kubeServicesChain),
|
||||
"-m", "comment", "--comment", proxier.ipsetList[kubeExternalIPLocalSet].getComment(),
|
||||
"-m", "set", "--match-set", proxier.ipsetList[kubeExternalIPLocalSet].Name,
|
||||
"dst,dst",
|
||||
)
|
||||
externalIPRules(args)
|
||||
}
|
||||
|
||||
// -A KUBE-SERVICES -m addrtype --dst-type LOCAL -j KUBE-NODE-PORT
|
||||
args = append(args[:0],
|
||||
"-A", string(kubeServicesChain),
|
||||
|
@ -33,6 +33,9 @@ import (
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/proxy"
|
||||
"k8s.io/kubernetes/pkg/proxy/healthcheck"
|
||||
netlinktest "k8s.io/kubernetes/pkg/proxy/ipvs/testing"
|
||||
@ -1256,6 +1259,92 @@ func TestExternalIPs(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestOnlyLocalExternalIPs(t *testing.T) {
|
||||
// TODO(freehan): remove this in k8s 1.19
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ExternalPolicyForExternalIP, true)()
|
||||
|
||||
ipt := iptablestest.NewFake()
|
||||
ipvs := ipvstest.NewFake()
|
||||
ipset := ipsettest.NewFake(testIPSetVersion)
|
||||
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, false)
|
||||
svcIP := "10.20.30.41"
|
||||
svcPort := 80
|
||||
svcExternalIPs := sets.NewString("50.60.70.81", "2012::51", "127.0.0.1")
|
||||
svcPortName := proxy.ServicePortName{
|
||||
NamespacedName: makeNSN("ns1", "svc1"),
|
||||
Port: "p80",
|
||||
}
|
||||
|
||||
makeServiceMap(fp,
|
||||
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
|
||||
svc.Spec.Type = "NodePort"
|
||||
svc.Spec.ClusterIP = svcIP
|
||||
svc.Spec.ExternalIPs = svcExternalIPs.UnsortedList()
|
||||
svc.Spec.Ports = []v1.ServicePort{{
|
||||
Name: svcPortName.Port,
|
||||
Port: int32(svcPort),
|
||||
Protocol: v1.ProtocolTCP,
|
||||
TargetPort: intstr.FromInt(svcPort),
|
||||
}}
|
||||
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
|
||||
}),
|
||||
)
|
||||
epIP := "10.180.0.1"
|
||||
epIP1 := "10.180.1.1"
|
||||
thisHostname := testHostname
|
||||
otherHostname := "other-hostname"
|
||||
makeEndpointsMap(fp,
|
||||
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
|
||||
ept.Subsets = []v1.EndpointSubset{{
|
||||
Addresses: []v1.EndpointAddress{{
|
||||
IP: epIP,
|
||||
NodeName: utilpointer.StringPtr(thisHostname),
|
||||
},
|
||||
{
|
||||
IP: epIP1,
|
||||
NodeName: utilpointer.StringPtr(otherHostname),
|
||||
},
|
||||
},
|
||||
Ports: []v1.EndpointPort{{
|
||||
Name: svcPortName.Port,
|
||||
Port: int32(svcPort),
|
||||
Protocol: v1.ProtocolTCP,
|
||||
}},
|
||||
}}
|
||||
}),
|
||||
)
|
||||
|
||||
fp.syncProxyRules()
|
||||
|
||||
// check ipvs service and destinations
|
||||
services, err := ipvs.GetVirtualServers()
|
||||
if err != nil {
|
||||
t.Errorf("Failed to get ipvs services, err: %v", err)
|
||||
}
|
||||
if len(services) != 4 {
|
||||
t.Errorf("Expect 4 ipvs services, got %d", len(services))
|
||||
}
|
||||
found := false
|
||||
for _, svc := range services {
|
||||
if svcExternalIPs.Has(svc.Address.String()) && svc.Port == uint16(svcPort) && svc.Protocol == string(v1.ProtocolTCP) {
|
||||
found = true
|
||||
destinations, _ := ipvs.GetRealServers(svc)
|
||||
if len(destinations) != 1 {
|
||||
t.Errorf("Expect only 1 local endpoint. but got %v", len(destinations))
|
||||
}
|
||||
for _, dest := range destinations {
|
||||
if dest.Address.String() != epIP || dest.Port != uint16(svcPort) {
|
||||
t.Errorf("service Endpoint mismatch ipvs service destination")
|
||||
}
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Errorf("Expect external ip type service, got none")
|
||||
}
|
||||
}
|
||||
|
||||
func TestLoadBalancer(t *testing.T) {
|
||||
ipt, fp := buildFakeProxier()
|
||||
svcIP := "10.20.30.41"
|
||||
@ -1432,6 +1521,7 @@ func TestOnlyLocalNodePorts(t *testing.T) {
|
||||
}
|
||||
checkIptables(t, ipt, epIpt)
|
||||
}
|
||||
|
||||
func TestLoadBalanceSourceRanges(t *testing.T) {
|
||||
ipt, fp := buildFakeProxier()
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user