use ipset doing snat and packet filter in ipvs proxy

This commit is contained in:
m1093782566 2017-11-15 17:20:49 +08:00
parent c124fcf7d7
commit fbf8a13376
7 changed files with 484 additions and 120 deletions

View File

@ -41,6 +41,7 @@ go_library(
"//pkg/proxy/userspace:go_default_library", "//pkg/proxy/userspace:go_default_library",
"//pkg/util/configz:go_default_library", "//pkg/util/configz:go_default_library",
"//pkg/util/dbus:go_default_library", "//pkg/util/dbus:go_default_library",
"//pkg/util/ipset:go_default_library",
"//pkg/util/iptables:go_default_library", "//pkg/util/iptables:go_default_library",
"//pkg/util/ipvs:go_default_library", "//pkg/util/ipvs:go_default_library",
"//pkg/util/mount:go_default_library", "//pkg/util/mount:go_default_library",

View File

@ -63,6 +63,7 @@ import (
"k8s.io/kubernetes/pkg/proxy/ipvs" "k8s.io/kubernetes/pkg/proxy/ipvs"
"k8s.io/kubernetes/pkg/proxy/userspace" "k8s.io/kubernetes/pkg/proxy/userspace"
"k8s.io/kubernetes/pkg/util/configz" "k8s.io/kubernetes/pkg/util/configz"
utilipset "k8s.io/kubernetes/pkg/util/ipset"
utiliptables "k8s.io/kubernetes/pkg/util/iptables" utiliptables "k8s.io/kubernetes/pkg/util/iptables"
utilipvs "k8s.io/kubernetes/pkg/util/ipvs" utilipvs "k8s.io/kubernetes/pkg/util/ipvs"
utilnode "k8s.io/kubernetes/pkg/util/node" utilnode "k8s.io/kubernetes/pkg/util/node"
@ -356,6 +357,7 @@ type ProxyServer struct {
EventClient v1core.EventsGetter EventClient v1core.EventsGetter
IptInterface utiliptables.Interface IptInterface utiliptables.Interface
IpvsInterface utilipvs.Interface IpvsInterface utilipvs.Interface
IpsetInterface utilipset.Interface
execer exec.Interface execer exec.Interface
Proxier proxy.ProxyProvider Proxier proxy.ProxyProvider
Broadcaster record.EventBroadcaster Broadcaster record.EventBroadcaster
@ -422,7 +424,7 @@ func (s *ProxyServer) Run() error {
if s.CleanupAndExit { if s.CleanupAndExit {
encounteredError := userspace.CleanupLeftovers(s.IptInterface) encounteredError := userspace.CleanupLeftovers(s.IptInterface)
encounteredError = iptables.CleanupLeftovers(s.IptInterface) || encounteredError encounteredError = iptables.CleanupLeftovers(s.IptInterface) || encounteredError
encounteredError = ipvs.CleanupLeftovers(s.IpvsInterface, s.IptInterface) || encounteredError encounteredError = ipvs.CleanupLeftovers(s.IpvsInterface, s.IptInterface, s.IpsetInterface) || encounteredError
if encounteredError { if encounteredError {
return errors.New("encountered an error while tearing down rules.") return errors.New("encountered an error while tearing down rules.")
} }

View File

@ -43,6 +43,7 @@ import (
"k8s.io/kubernetes/pkg/proxy/userspace" "k8s.io/kubernetes/pkg/proxy/userspace"
"k8s.io/kubernetes/pkg/util/configz" "k8s.io/kubernetes/pkg/util/configz"
utildbus "k8s.io/kubernetes/pkg/util/dbus" utildbus "k8s.io/kubernetes/pkg/util/dbus"
utilipset "k8s.io/kubernetes/pkg/util/ipset"
utiliptables "k8s.io/kubernetes/pkg/util/iptables" utiliptables "k8s.io/kubernetes/pkg/util/iptables"
utilipvs "k8s.io/kubernetes/pkg/util/ipvs" utilipvs "k8s.io/kubernetes/pkg/util/ipvs"
utilnode "k8s.io/kubernetes/pkg/util/node" utilnode "k8s.io/kubernetes/pkg/util/node"
@ -72,6 +73,7 @@ func NewProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi
var iptInterface utiliptables.Interface var iptInterface utiliptables.Interface
var ipvsInterface utilipvs.Interface var ipvsInterface utilipvs.Interface
var ipsetInterface utilipset.Interface
var dbus utildbus.Interface var dbus utildbus.Interface
// Create a iptables utils. // Create a iptables utils.
@ -80,6 +82,7 @@ func NewProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi
dbus = utildbus.New() dbus = utildbus.New()
iptInterface = utiliptables.New(execer, dbus, protocol) iptInterface = utiliptables.New(execer, dbus, protocol)
ipvsInterface = utilipvs.New(execer) ipvsInterface = utilipvs.New(execer)
ipsetInterface = utilipset.New(execer)
// We omit creation of pretty much everything if we run in cleanup mode // We omit creation of pretty much everything if we run in cleanup mode
if cleanupAndExit { if cleanupAndExit {
@ -87,6 +90,7 @@ func NewProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi
execer: execer, execer: execer,
IptInterface: iptInterface, IptInterface: iptInterface,
IpvsInterface: ipvsInterface, IpvsInterface: ipvsInterface,
IpsetInterface: ipsetInterface,
CleanupAndExit: cleanupAndExit, CleanupAndExit: cleanupAndExit,
}, nil }, nil
} }
@ -119,7 +123,7 @@ func NewProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi
var serviceEventHandler proxyconfig.ServiceHandler var serviceEventHandler proxyconfig.ServiceHandler
var endpointsEventHandler proxyconfig.EndpointsHandler var endpointsEventHandler proxyconfig.EndpointsHandler
proxyMode := getProxyMode(string(config.Mode), iptInterface, iptables.LinuxKernelCompatTester{}) proxyMode := getProxyMode(string(config.Mode), iptInterface, ipsetInterface, iptables.LinuxKernelCompatTester{})
if proxyMode == proxyModeIPTables { if proxyMode == proxyModeIPTables {
glog.V(0).Info("Using iptables Proxier.") glog.V(0).Info("Using iptables Proxier.")
nodeIP := net.ParseIP(config.BindAddress) nodeIP := net.ParseIP(config.BindAddress)
@ -159,12 +163,13 @@ func NewProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi
userspace.CleanupLeftovers(iptInterface) userspace.CleanupLeftovers(iptInterface)
// IPVS Proxier will generate some iptables rules, // IPVS Proxier will generate some iptables rules,
// need to clean them before switching to other proxy mode. // need to clean them before switching to other proxy mode.
ipvs.CleanupLeftovers(ipvsInterface, iptInterface) ipvs.CleanupLeftovers(ipvsInterface, iptInterface, ipsetInterface)
} else if proxyMode == proxyModeIPVS { } else if proxyMode == proxyModeIPVS {
glog.V(0).Info("Using ipvs Proxier.") glog.V(0).Info("Using ipvs Proxier.")
proxierIPVS, err := ipvs.NewProxier( proxierIPVS, err := ipvs.NewProxier(
iptInterface, iptInterface,
ipvsInterface, ipvsInterface,
ipsetInterface,
utilsysctl.New(), utilsysctl.New(),
execer, execer,
config.IPVS.SyncPeriod.Duration, config.IPVS.SyncPeriod.Duration,
@ -220,7 +225,7 @@ func NewProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi
iptables.CleanupLeftovers(iptInterface) iptables.CleanupLeftovers(iptInterface)
// IPVS Proxier will generate some iptables rules, // IPVS Proxier will generate some iptables rules,
// need to clean them before switching to other proxy mode. // need to clean them before switching to other proxy mode.
ipvs.CleanupLeftovers(ipvsInterface, iptInterface) ipvs.CleanupLeftovers(ipvsInterface, iptInterface, ipsetInterface)
} }
iptInterface.AddReloadFunc(proxier.Sync) iptInterface.AddReloadFunc(proxier.Sync)
@ -230,6 +235,7 @@ func NewProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi
EventClient: eventClient, EventClient: eventClient,
IptInterface: iptInterface, IptInterface: iptInterface,
IpvsInterface: ipvsInterface, IpvsInterface: ipvsInterface,
IpsetInterface: ipsetInterface,
execer: execer, execer: execer,
Proxier: proxier, Proxier: proxier,
Broadcaster: eventBroadcaster, Broadcaster: eventBroadcaster,
@ -249,7 +255,7 @@ func NewProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi
}, nil }, nil
} }
func getProxyMode(proxyMode string, iptver iptables.IPTablesVersioner, kcompat iptables.KernelCompatTester) string { func getProxyMode(proxyMode string, iptver iptables.IPTablesVersioner, ipsetver ipvs.IPSetVersioner, kcompat iptables.KernelCompatTester) string {
if proxyMode == proxyModeUserspace { if proxyMode == proxyModeUserspace {
return proxyModeUserspace return proxyModeUserspace
} }
@ -260,7 +266,7 @@ func getProxyMode(proxyMode string, iptver iptables.IPTablesVersioner, kcompat i
if utilfeature.DefaultFeatureGate.Enabled(features.SupportIPVSProxyMode) { if utilfeature.DefaultFeatureGate.Enabled(features.SupportIPVSProxyMode) {
if proxyMode == proxyModeIPVS { if proxyMode == proxyModeIPVS {
return tryIPVSProxy(iptver, kcompat) return tryIPVSProxy(iptver, ipsetver, kcompat)
} else { } else {
glog.Warningf("Can't use ipvs proxier, trying iptables proxier") glog.Warningf("Can't use ipvs proxier, trying iptables proxier")
return tryIPTablesProxy(iptver, kcompat) return tryIPTablesProxy(iptver, kcompat)
@ -270,10 +276,10 @@ func getProxyMode(proxyMode string, iptver iptables.IPTablesVersioner, kcompat i
return tryIPTablesProxy(iptver, kcompat) return tryIPTablesProxy(iptver, kcompat)
} }
func tryIPVSProxy(iptver iptables.IPTablesVersioner, kcompat iptables.KernelCompatTester) string { func tryIPVSProxy(iptver iptables.IPTablesVersioner, ipsetver ipvs.IPSetVersioner, kcompat iptables.KernelCompatTester) string {
// guaranteed false on error, error only necessary for debugging // guaranteed false on error, error only necessary for debugging
// IPVS Proxier relies on iptables // IPVS Proxier relies on ipset
useIPVSProxy, err := ipvs.CanUseIPVSProxier() useIPVSProxy, err := ipvs.CanUseIPVSProxier(ipsetver)
if err != nil { if err != nil {
// Try to fallback to iptables before falling back to userspace // Try to fallback to iptables before falling back to userspace
utilruntime.HandleError(fmt.Errorf("can't determine whether to use ipvs proxy, error: %v", err)) utilruntime.HandleError(fmt.Errorf("can't determine whether to use ipvs proxy, error: %v", err))
@ -282,8 +288,6 @@ func tryIPVSProxy(iptver iptables.IPTablesVersioner, kcompat iptables.KernelComp
return proxyModeIPVS return proxyModeIPVS
} }
// TODO: Check ipvs version
// Try to fallback to iptables before falling back to userspace // Try to fallback to iptables before falling back to userspace
glog.V(1).Infof("Can't use ipvs proxier, trying iptables proxier") glog.V(1).Infof("Can't use ipvs proxier, trying iptables proxier")
return tryIPTablesProxy(iptver, kcompat) return tryIPTablesProxy(iptver, kcompat)

View File

@ -9,6 +9,7 @@ load(
go_test( go_test(
name = "go_default_test", name = "go_default_test",
srcs = [ srcs = [
"ipset_test.go",
"proxier_test.go", "proxier_test.go",
], ],
importpath = "k8s.io/kubernetes/pkg/proxy/ipvs", importpath = "k8s.io/kubernetes/pkg/proxy/ipvs",
@ -18,6 +19,8 @@ go_test(
"//pkg/proxy:go_default_library", "//pkg/proxy:go_default_library",
"//pkg/proxy/ipvs/testing:go_default_library", "//pkg/proxy/ipvs/testing:go_default_library",
"//pkg/proxy/util:go_default_library", "//pkg/proxy/util:go_default_library",
"//pkg/util/ipset:go_default_library",
"//pkg/util/ipset/testing:go_default_library",
"//pkg/util/iptables:go_default_library", "//pkg/util/iptables:go_default_library",
"//pkg/util/iptables/testing:go_default_library", "//pkg/util/iptables/testing:go_default_library",
"//pkg/util/ipvs:go_default_library", "//pkg/util/ipvs:go_default_library",
@ -35,6 +38,7 @@ go_test(
go_library( go_library(
name = "go_default_library", name = "go_default_library",
srcs = [ srcs = [
"ipset.go",
"netlink.go", "netlink.go",
"netlink_unsupported.go", "netlink_unsupported.go",
"proxier.go", "proxier.go",
@ -55,9 +59,11 @@ go_library(
"//pkg/proxy/metrics:go_default_library", "//pkg/proxy/metrics:go_default_library",
"//pkg/proxy/util:go_default_library", "//pkg/proxy/util:go_default_library",
"//pkg/util/async:go_default_library", "//pkg/util/async:go_default_library",
"//pkg/util/ipset:go_default_library",
"//pkg/util/iptables:go_default_library", "//pkg/util/iptables:go_default_library",
"//pkg/util/ipvs:go_default_library", "//pkg/util/ipvs:go_default_library",
"//pkg/util/sysctl:go_default_library", "//pkg/util/sysctl:go_default_library",
"//pkg/util/version:go_default_library",
"//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library",

View File

@ -49,6 +49,7 @@ import (
"k8s.io/kubernetes/pkg/proxy/metrics" "k8s.io/kubernetes/pkg/proxy/metrics"
utilproxy "k8s.io/kubernetes/pkg/proxy/util" utilproxy "k8s.io/kubernetes/pkg/proxy/util"
"k8s.io/kubernetes/pkg/util/async" "k8s.io/kubernetes/pkg/util/async"
utilipset "k8s.io/kubernetes/pkg/util/ipset"
utiliptables "k8s.io/kubernetes/pkg/util/iptables" utiliptables "k8s.io/kubernetes/pkg/util/iptables"
utilipvs "k8s.io/kubernetes/pkg/util/ipvs" utilipvs "k8s.io/kubernetes/pkg/util/ipvs"
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
@ -59,6 +60,12 @@ const (
// kubeServicesChain is the services portal chain // kubeServicesChain is the services portal chain
kubeServicesChain utiliptables.Chain = "KUBE-SERVICES" kubeServicesChain utiliptables.Chain = "KUBE-SERVICES"
// KubeServiceIPSetsChain is the services access IP chain
KubeServiceIPSetsChain utiliptables.Chain = "KUBE-SVC-IPSETS"
// KubeFireWallChain is the kubernetes firewall chain.
KubeFireWallChain utiliptables.Chain = "KUBE-FIRE-WALL"
// kubePostroutingChain is the kubernetes postrouting chain // kubePostroutingChain is the kubernetes postrouting chain
kubePostroutingChain utiliptables.Chain = "KUBE-POSTROUTING" kubePostroutingChain utiliptables.Chain = "KUBE-POSTROUTING"
@ -67,11 +74,10 @@ const (
// KubeMarkDropChain is the mark-for-drop chain // KubeMarkDropChain is the mark-for-drop chain
KubeMarkDropChain utiliptables.Chain = "KUBE-MARK-DROP" KubeMarkDropChain utiliptables.Chain = "KUBE-MARK-DROP"
)
const (
// DefaultScheduler is the default ipvs scheduler algorithm - round robin. // DefaultScheduler is the default ipvs scheduler algorithm - round robin.
DefaultScheduler = "rr" DefaultScheduler = "rr"
// DefaultDummyDevice is the default dummy interface where ipvs service address will bind to it. // DefaultDummyDevice is the default dummy interface where ipvs service address will bind to it.
DefaultDummyDevice = "kube-ipvs0" DefaultDummyDevice = "kube-ipvs0"
) )
@ -117,6 +123,7 @@ type Proxier struct {
minSyncPeriod time.Duration minSyncPeriod time.Duration
iptables utiliptables.Interface iptables utiliptables.Interface
ipvs utilipvs.Interface ipvs utilipvs.Interface
ipset utilipset.Interface
exec utilexec.Interface exec utilexec.Interface
masqueradeAll bool masqueradeAll bool
masqueradeMark string masqueradeMark string
@ -137,6 +144,26 @@ type Proxier struct {
natRules *bytes.Buffer natRules *bytes.Buffer
// Added as a member to the struct to allow injection for testing. // Added as a member to the struct to allow injection for testing.
netlinkHandle NetLinkHandle netlinkHandle NetLinkHandle
// loopbackSet is the ipset where stores all endpoints IP:Port,IP for solving hairpin mode purpose.
loopbackSet *IPSet
// clusterIPSet is the ipset where stores all service ClusterIP:Port
clusterIPSet *IPSet
// nodePortSetTCP is the bitmap:port type ipset where stores all TCP node port
nodePortSetTCP *IPSet
// nodePortSetTCP is the bitmap:port type ipset where stores all UDP node port
nodePortSetUDP *IPSet
// externalIPSet is the hash:ip,port type ipset where stores all service ExternalIP:Port
externalIPSet *IPSet
// lbIngressSet is the hash:ip,port type ipset where stores all service load balancer ingress IP:Port.
lbIngressSet *IPSet
// lbMasqSet is the hash:ip,port type ipset where stores all service load balancer ingress IP:Port which needs masquerade.
lbMasqSet *IPSet
// lbWhiteListIPSet is the hash:ip,port,ip type ipset where stores all service load balancer ingress IP:Port,sourceIP pair, any packets
// with the source IP visit ingress IP:Port can pass through.
lbWhiteListIPSet *IPSet
// lbWhiteListIPSet is the hash:ip,port,net type ipset where stores all service load balancer ingress IP:Port,sourceCIDR pair, any packets
// from the source CIDR visit ingress IP:Port can pass through.
lbWhiteListCIDRSet *IPSet
} }
// IPGetter helps get node network interface IP // IPGetter helps get node network interface IP
@ -184,7 +211,9 @@ var _ proxy.ProxyProvider = &Proxier{}
// An error will be returned if it fails to update or acquire the initial lock. // An error will be returned if it fails to update or acquire the initial lock.
// Once a proxier is created, it will keep iptables and ipvs rules up to date in the background and // Once a proxier is created, it will keep iptables and ipvs rules up to date in the background and
// will not terminate if a particular iptables or ipvs call fails. // will not terminate if a particular iptables or ipvs call fails.
func NewProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, func NewProxier(ipt utiliptables.Interface,
ipvs utilipvs.Interface,
ipset utilipset.Interface,
sysctl utilsysctl.Interface, sysctl utilsysctl.Interface,
exec utilexec.Interface, exec utilexec.Interface,
syncPeriod time.Duration, syncPeriod time.Duration,
@ -248,32 +277,46 @@ func NewProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface,
healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps
isIPv6 := utilproxy.IsIPv6(nodeIP)
glog.V(2).Infof("nodeIP: %v, isIPv6: %v", nodeIP, isIPv6)
proxier := &Proxier{ proxier := &Proxier{
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
serviceMap: make(proxyServiceMap), serviceMap: make(proxyServiceMap),
serviceChanges: newServiceChangeMap(), serviceChanges: newServiceChangeMap(),
endpointsMap: make(proxyEndpointsMap), endpointsMap: make(proxyEndpointsMap),
endpointsChanges: newEndpointsChangeMap(hostname), endpointsChanges: newEndpointsChangeMap(hostname),
syncPeriod: syncPeriod, syncPeriod: syncPeriod,
minSyncPeriod: minSyncPeriod, minSyncPeriod: minSyncPeriod,
iptables: ipt, iptables: ipt,
masqueradeAll: masqueradeAll, masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark, masqueradeMark: masqueradeMark,
exec: exec, exec: exec,
clusterCIDR: clusterCIDR, clusterCIDR: clusterCIDR,
hostname: hostname, hostname: hostname,
nodeIP: nodeIP, nodeIP: nodeIP,
portMapper: &listenPortOpener{}, portMapper: &listenPortOpener{},
recorder: recorder, recorder: recorder,
healthChecker: healthChecker, healthChecker: healthChecker,
healthzServer: healthzServer, healthzServer: healthzServer,
ipvs: ipvs, ipvs: ipvs,
ipvsScheduler: scheduler, ipvsScheduler: scheduler,
ipGetter: &realIPGetter{}, ipGetter: &realIPGetter{},
iptablesData: bytes.NewBuffer(nil), iptablesData: bytes.NewBuffer(nil),
natChains: bytes.NewBuffer(nil), natChains: bytes.NewBuffer(nil),
natRules: bytes.NewBuffer(nil), natRules: bytes.NewBuffer(nil),
netlinkHandle: NewNetLinkHandle(), netlinkHandle: NewNetLinkHandle(),
ipset: ipset,
loopbackSet: NewIPSet(ipset, KubeLoopBackIPSet, utilipset.HashIPPortIP, isIPv6),
clusterIPSet: NewIPSet(ipset, KubeClusterIPSet, utilipset.HashIPPort, isIPv6),
externalIPSet: NewIPSet(ipset, KubeExternalIPSet, utilipset.HashIPPort, isIPv6),
lbIngressSet: NewIPSet(ipset, KubeLoadBalancerSet, utilipset.HashIPPort, isIPv6),
lbMasqSet: NewIPSet(ipset, KubeLoadBalancerMasqSet, utilipset.HashIPPort, isIPv6),
lbWhiteListIPSet: NewIPSet(ipset, KubeLoadBalancerSourceIPSet, utilipset.HashIPPortIP, isIPv6),
lbWhiteListCIDRSet: NewIPSet(ipset, KubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, isIPv6),
nodePortSetTCP: NewIPSet(ipset, KubeNodePortSetTCP, utilipset.BitmapPort, false),
nodePortSetUDP: NewIPSet(ipset, KubeNodePortSetUDP, utilipset.BitmapPort, false),
} }
burstSyncs := 2 burstSyncs := 2
glog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs) glog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs)
@ -485,6 +528,11 @@ func (e *endpointsInfo) IPPart() string {
return utilproxy.IPPart(e.endpoint) return utilproxy.IPPart(e.endpoint)
} }
// PortPart returns just the Port part of the endpoint.
func (e *endpointsInfo) PortPart() (int, error) {
return utilproxy.PortPart(e.endpoint)
}
type endpointServicePair struct { type endpointServicePair struct {
endpoint string endpoint string
servicePortName proxy.ServicePortName servicePortName proxy.ServicePortName
@ -652,7 +700,7 @@ func (em proxyEndpointsMap) unmerge(other proxyEndpointsMap) {
// This is determined by checking if all the required kernel modules can be loaded. It may // This is determined by checking if all the required kernel modules can be loaded. It may
// return an error if it fails to get the kernel modules information without error, in which // return an error if it fails to get the kernel modules information without error, in which
// case it will also return false. // case it will also return false.
func CanUseIPVSProxier() (bool, error) { func CanUseIPVSProxier(ipsetver IPSetVersioner) (bool, error) {
// Try to load IPVS required kernel modules using modprobe // Try to load IPVS required kernel modules using modprobe
for _, kmod := range ipvsModules { for _, kmod := range ipvsModules {
err := utilexec.New().Command("modprobe", "--", kmod).Run() err := utilexec.New().Command("modprobe", "--", kmod).Run()
@ -677,6 +725,15 @@ func CanUseIPVSProxier() (bool, error) {
if len(modules) != 0 { if len(modules) != 0 {
return false, fmt.Errorf("IPVS proxier will not be used because the following required kernel modules are not loaded: %v", modules) return false, fmt.Errorf("IPVS proxier will not be used because the following required kernel modules are not loaded: %v", modules)
} }
// Check ipset version
versionString, err := ipsetver.GetVersion()
if err != nil {
return false, fmt.Errorf("error getting ipset version, error: %v", err)
}
if !checkMinVersion(versionString) {
return false, fmt.Errorf("ipset version: %s is less than min required version: %s", versionString, MinIPSetCheckVersion)
}
return true, nil return true, nil
} }
@ -728,7 +785,7 @@ func cleanupIptablesLeftovers(ipt utiliptables.Interface) (encounteredError bool
natRules := bytes.NewBuffer(nil) natRules := bytes.NewBuffer(nil)
writeLine(natChains, "*nat") writeLine(natChains, "*nat")
// Start with chains we know we need to remove. // Start with chains we know we need to remove.
for _, chain := range []utiliptables.Chain{kubeServicesChain, kubePostroutingChain, KubeMarkMasqChain} { for _, chain := range []utiliptables.Chain{kubeServicesChain, kubePostroutingChain, KubeMarkMasqChain, KubeServiceIPSetsChain} {
if _, found := existingNATChains[chain]; found { if _, found := existingNATChains[chain]; found {
chainString := string(chain) chainString := string(chain)
writeLine(natChains, existingNATChains[chain]) // flush writeLine(natChains, existingNATChains[chain]) // flush
@ -748,7 +805,7 @@ func cleanupIptablesLeftovers(ipt utiliptables.Interface) (encounteredError bool
} }
// CleanupLeftovers clean up all ipvs and iptables rules created by ipvs Proxier. // CleanupLeftovers clean up all ipvs and iptables rules created by ipvs Proxier.
func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface) (encounteredError bool) { func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface, ipset utilipset.Interface) (encounteredError bool) {
// Return immediately when ipvs interface is nil - Probably initialization failed in somewhere. // Return immediately when ipvs interface is nil - Probably initialization failed in somewhere.
if ipvs == nil { if ipvs == nil {
return true return true
@ -768,6 +825,16 @@ func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface) (enco
} }
// Clear iptables created by ipvs Proxier. // Clear iptables created by ipvs Proxier.
encounteredError = cleanupIptablesLeftovers(ipt) || encounteredError encounteredError = cleanupIptablesLeftovers(ipt) || encounteredError
// Destroy ip sets created by ipvs Proxier. We should call it after cleaning up
// iptables since we can NOT delete ip set which is still referenced by iptables.
ipSetsToDestroy := []string{KubeLoopBackIPSet, KubeClusterIPSet, KubeLoadBalancerSet, KubeNodePortSetTCP, KubeNodePortSetUDP,
KubeExternalIPSet, KubeLoadBalancerSourceIPSet, KubeLoadBalancerSourceCIDRSet, KubeLoadBalancerMasqSet}
for _, set := range ipSetsToDestroy {
err = ipset.DestroySet(set)
if err != nil {
encounteredError = true
}
}
return encounteredError return encounteredError
} }
@ -957,6 +1024,16 @@ func (proxier *Proxier) syncProxyRules() {
return return
} }
// make sure ip sets exists in the system.
ipSets := []*IPSet{proxier.loopbackSet, proxier.clusterIPSet, proxier.externalIPSet, proxier.nodePortSetUDP, proxier.nodePortSetTCP,
proxier.lbIngressSet, proxier.lbMasqSet, proxier.lbWhiteListCIDRSet, proxier.lbWhiteListIPSet}
if err := ensureIPSets(ipSets...); err != nil {
return
}
for i := range ipSets {
ipSets[i].resetEntries()
}
// Accumulate the set of local ports that we will be holding open once this update is complete // Accumulate the set of local ports that we will be holding open once this update is complete
replacementPortsMap := map[utilproxy.LocalPort]utilproxy.Closeable{} replacementPortsMap := map[utilproxy.LocalPort]utilproxy.Closeable{}
// activeIPVSServices represents IPVS service successfully created in this round of sync // activeIPVSServices represents IPVS service successfully created in this round of sync
@ -976,6 +1053,17 @@ func (proxier *Proxier) syncProxyRules() {
// is just for efficiency, not correctness. // is just for efficiency, not correctness.
args := make([]string, 64) args := make([]string, 64)
// Kube service portal
if err := proxier.linkKubeServiceChain(existingNATChains, proxier.natChains); err != nil {
glog.Errorf("Failed to link KUBE-SERVICES chain: %v", err)
return
}
// Kube service ipset
if err := proxier.createKubeFireWallChain(existingNATChains, proxier.natChains); err != nil {
glog.Errorf("Failed to create KUBE-FIRE-WALL chain: %v", err)
return
}
// Build IPVS rules for each service. // Build IPVS rules for each service.
for svcName, svcInfo := range proxier.serviceMap { for svcName, svcInfo := range proxier.serviceMap {
protocol := strings.ToLower(string(svcInfo.protocol)) protocol := strings.ToLower(string(svcInfo.protocol))
@ -983,7 +1071,41 @@ func (proxier *Proxier) syncProxyRules() {
// to ServicePortName.String() show up in CPU profiles. // to ServicePortName.String() show up in CPU profiles.
svcNameString := svcName.String() svcNameString := svcName.String()
// Handle traffic that loops back to the originator with SNAT.
for _, ep := range proxier.endpointsMap[svcName] {
epIP := ep.IPPart()
epPort, err := ep.PortPart()
// Error parsing this endpoint has been logged. Skip to next endpoint.
if epIP == "" || err != nil {
continue
}
entry := &utilipset.Entry{
IP: epIP,
Port: epPort,
Protocol: protocol,
IP2: epIP,
SetType: utilipset.HashIPPortIP,
}
proxier.loopbackSet.activeEntries.Insert(entry.String())
}
// Capture the clusterIP. // Capture the clusterIP.
// ipset call
entry := &utilipset.Entry{
IP: svcInfo.clusterIP.String(),
Port: svcInfo.port,
Protocol: protocol,
SetType: utilipset.HashIPPort,
}
// add service Cluster IP:Port to kubeServiceAccess ip set for the purpose of solving hairpin.
// proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String())
// Install masquerade rules if 'masqueradeAll' or 'clusterCIDR' is specified.
if proxier.masqueradeAll {
proxier.clusterIPSet.activeEntries.Insert(entry.String())
} else if len(proxier.clusterCIDR) > 0 {
proxier.clusterIPSet.activeEntries.Insert(entry.String())
}
// ipvs call
serv := &utilipvs.VirtualServer{ serv := &utilipvs.VirtualServer{
Address: svcInfo.clusterIP, Address: svcInfo.clusterIP,
Port: uint16(svcInfo.port), Port: uint16(svcInfo.port),
@ -1004,32 +1126,6 @@ func (proxier *Proxier) syncProxyRules() {
} else { } else {
glog.Errorf("Failed to sync service: %v, err: %v", serv, err) glog.Errorf("Failed to sync service: %v, err: %v", serv, err)
} }
// Install masquerade rules if 'masqueradeAll' or 'clusterCIDR' is specified.
args = append(args[:0],
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString),
"-m", protocol, "-p", protocol,
"-d", utilproxy.ToCIDR(svcInfo.clusterIP),
"--dport", strconv.Itoa(svcInfo.port),
)
if proxier.masqueradeAll {
err = proxier.linkKubeServiceChain(existingNATChains, proxier.natChains)
if err != nil {
glog.Errorf("Failed to link KUBE-SERVICES chain: %v", err)
}
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
} else if len(proxier.clusterCIDR) > 0 {
// This masquerades off-cluster traffic to a service VIP. The idea
// is that you can establish a static route for your Service range,
// routing to any node, and that node will bridge into the Service
// for you. Since that might bounce off-node, we masquerade here.
// If/when we support "Local" policy for VIPs, we should update this.
err = proxier.linkKubeServiceChain(existingNATChains, proxier.natChains)
if err != nil {
glog.Errorf("Failed to link KUBE-SERVICES chain: %v", err)
}
writeLine(proxier.natRules, append(args, "! -s", proxier.clusterCIDR, "-j", string(KubeMarkMasqChain))...)
}
// Capture externalIPs. // Capture externalIPs.
for _, externalIP := range svcInfo.externalIPs { for _, externalIP := range svcInfo.externalIPs {
@ -1064,6 +1160,17 @@ func (proxier *Proxier) syncProxyRules() {
} }
} // We're holding the port, so it's OK to install IPVS rules. } // We're holding the port, so it's OK to install IPVS rules.
// ipset call
entry := &utilipset.Entry{
IP: externalIP,
Port: svcInfo.port,
Protocol: protocol,
SetType: utilipset.HashIPPort,
}
// We have to SNAT packets to external IPs.
proxier.externalIPSet.activeEntries.Insert(entry.String())
// ipvs call
serv := &utilipvs.VirtualServer{ serv := &utilipvs.VirtualServer{
Address: net.ParseIP(externalIP), Address: net.ParseIP(externalIP),
Port: uint16(svcInfo.port), Port: uint16(svcInfo.port),
@ -1088,25 +1195,39 @@ func (proxier *Proxier) syncProxyRules() {
// Capture load-balancer ingress. // Capture load-balancer ingress.
for _, ingress := range svcInfo.loadBalancerStatus.Ingress { for _, ingress := range svcInfo.loadBalancerStatus.Ingress {
if ingress.IP != "" { if ingress.IP != "" {
// ipset call
entry = &utilipset.Entry{
IP: ingress.IP,
Port: svcInfo.port,
Protocol: protocol,
SetType: utilipset.HashIPPort,
}
// add service load balancer ingressIP:Port to kubeServiceAccess ip set for the purpose of solving hairpin.
// proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String())
// If we are proxying globally, we need to masquerade in case we cross nodes.
// If we are proxying only locally, we can retain the source IP.
if !svcInfo.onlyNodeLocalEndpoints {
proxier.lbMasqSet.activeEntries.Insert(entry.String())
}
if len(svcInfo.loadBalancerSourceRanges) != 0 { if len(svcInfo.loadBalancerSourceRanges) != 0 {
err = proxier.linkKubeServiceChain(existingNATChains, proxier.natChains)
if err != nil {
glog.Errorf("Failed to link KUBE-SERVICES chain: %v", err)
}
// The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field. // The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field.
// This currently works for loadbalancers that preserves source ips. // This currently works for loadbalancers that preserves source ips.
// For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply. // For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply.
args = append(args[:0], proxier.lbIngressSet.activeEntries.Insert(entry.String())
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString),
"-m", string(svcInfo.protocol), "-p", string(svcInfo.protocol),
"-d", utilproxy.ToCIDR(net.ParseIP(ingress.IP)),
"--dport", fmt.Sprintf("%d", svcInfo.port),
)
allowFromNode := false allowFromNode := false
for _, src := range svcInfo.loadBalancerSourceRanges { for _, src := range svcInfo.loadBalancerSourceRanges {
writeLine(proxier.natRules, append(args, "-s", src, "-j", "ACCEPT")...) // ipset call
entry = &utilipset.Entry{
IP: ingress.IP,
Port: svcInfo.port,
Protocol: protocol,
Net: src,
SetType: utilipset.HashIPPortNet,
}
// enumerate all white list source cidr
proxier.lbWhiteListCIDRSet.activeEntries.Insert(entry.String())
// ignore error because it has been validated // ignore error because it has been validated
_, cidr, _ := net.ParseCIDR(src) _, cidr, _ := net.ParseCIDR(src)
if cidr.Contains(proxier.nodeIP) { if cidr.Contains(proxier.nodeIP) {
@ -1117,14 +1238,19 @@ func (proxier *Proxier) syncProxyRules() {
// loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly. // loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly.
// Need to add the following rule to allow request on host. // Need to add the following rule to allow request on host.
if allowFromNode { if allowFromNode {
writeLine(proxier.natRules, append(args, "-s", utilproxy.ToCIDR(net.ParseIP(ingress.IP)), "-j", "ACCEPT")...) entry = &utilipset.Entry{
IP: ingress.IP,
Port: svcInfo.port,
Protocol: protocol,
IP2: ingress.IP,
SetType: utilipset.HashIPPortIP,
}
// enumerate all white list source ip
proxier.lbWhiteListIPSet.activeEntries.Insert(entry.String())
} }
// If the packet was able to reach the end of firewall chain, then it did not get DNATed.
// It means the packet cannot go through the firewall, then DROP it.
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...)
} }
// ipvs call
serv := &utilipvs.VirtualServer{ serv := &utilipvs.VirtualServer{
Address: net.ParseIP(ingress.IP), Address: net.ParseIP(ingress.IP),
Port: uint16(svcInfo.port), Port: uint16(svcInfo.port),
@ -1170,12 +1296,33 @@ func (proxier *Proxier) syncProxyRules() {
replacementPortsMap[lp] = socket replacementPortsMap[lp] = socket
} // We're holding the port, so it's OK to install ipvs rules. } // We're holding the port, so it's OK to install ipvs rules.
// Nodeports need SNAT, unless they're local.
// ipset call
if !svcInfo.onlyNodeLocalEndpoints {
entry = &utilipset.Entry{
// No need to provide ip info
Port: svcInfo.nodePort,
Protocol: protocol,
SetType: utilipset.BitmapPort,
}
switch protocol {
case "tcp":
proxier.nodePortSetTCP.activeEntries.Insert(entry.String())
case "udp":
proxier.nodePortSetUDP.activeEntries.Insert(entry.String())
default:
// It should never hit
glog.Errorf("Unsupported protocol type: %s", protocol)
}
}
// Build ipvs kernel routes for each node ip address // Build ipvs kernel routes for each node ip address
nodeIPs, err := proxier.ipGetter.NodeIPs() nodeIPs, err := proxier.ipGetter.NodeIPs()
if err != nil { if err != nil {
glog.Errorf("Failed to get node IP, err: %v", err) glog.Errorf("Failed to get node IP, err: %v", err)
} else { } else {
for _, nodeIP := range nodeIPs { for _, nodeIP := range nodeIPs {
// ipvs call
serv := &utilipvs.VirtualServer{ serv := &utilipvs.VirtualServer{
Address: nodeIP, Address: nodeIP,
Port: uint16(svcInfo.nodePort), Port: uint16(svcInfo.nodePort),
@ -1200,6 +1347,119 @@ func (proxier *Proxier) syncProxyRules() {
} }
} }
// sync ipset entries
ipsetsToSync := []*IPSet{proxier.loopbackSet, proxier.clusterIPSet, proxier.lbIngressSet, proxier.lbMasqSet, proxier.nodePortSetTCP,
proxier.nodePortSetUDP, proxier.externalIPSet, proxier.lbWhiteListIPSet, proxier.lbWhiteListCIDRSet}
for i := range ipsetsToSync {
ipsetsToSync[i].syncIPSetEntries()
}
// Tail call iptables rules for ipset, make sure only call iptables once
// in a single loop per ip set.
if !proxier.loopbackSet.isEmpty() {
args = append(args[:0],
"-A", string(kubePostroutingChain),
"-m", "set", "--match-set", proxier.loopbackSet.Name,
"dst,dst,src",
)
writeLine(proxier.natRules, append(args, "-j", "MASQUERADE")...)
}
if !proxier.clusterIPSet.isEmpty() {
args = append(args[:0],
"-A", string(kubeServicesChain),
"-m", "set", "--match-set", proxier.clusterIPSet.Name,
"dst,dst",
)
if proxier.masqueradeAll {
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
} else if len(proxier.clusterCIDR) > 0 {
// This masquerades off-cluster traffic to a service VIP. The idea
// is that you can establish a static route for your Service range,
// routing to any node, and that node will bridge into the Service
// for you. Since that might bounce off-node, we masquerade here.
// If/when we support "Local" policy for VIPs, we should update this.
writeLine(proxier.natRules, append(args, "! -s", proxier.clusterCIDR, "-j", string(KubeMarkMasqChain))...)
}
}
if !proxier.externalIPSet.isEmpty() {
// Build masquerade rules for packets to external IPs.
args = append(args[:0],
"-A", string(kubeServicesChain),
"-m", "set", "--match-set", proxier.externalIPSet.Name,
"dst,dst",
)
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
// Allow traffic for external IPs that does not come from a bridge (i.e. not from a container)
// nor from a local process to be forwarded to the service.
// This rule roughly translates to "all traffic from off-machine".
// This is imperfect in the face of network plugins that might not use a bridge, but we can revisit that later.
externalTrafficOnlyArgs := append(args,
"-m", "physdev", "!", "--physdev-is-in",
"-m", "addrtype", "!", "--src-type", "LOCAL")
writeLine(proxier.natRules, append(externalTrafficOnlyArgs, "-j", "ACCEPT")...)
dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL")
// Allow traffic bound for external IPs that happen to be recognized as local IPs to stay local.
// This covers cases like GCE load-balancers which get added to the local routing table.
writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", "ACCEPT")...)
}
if !proxier.lbMasqSet.isEmpty() {
// Build masquerade rules for packets which cross node visit load balancer ingress IPs.
args = append(args[:0],
"-A", string(kubeServicesChain),
"-m", "set", "--match-set", proxier.lbMasqSet.Name,
"dst,dst",
)
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
}
if !proxier.lbWhiteListCIDRSet.isEmpty() || !proxier.lbWhiteListIPSet.isEmpty() {
// link kube-services chain -> kube-fire-wall chain
args := []string{"-m", "set", "--match-set", proxier.lbIngressSet.Name, "dst,dst", "-j", string(KubeFireWallChain)}
if _, err := proxier.iptables.EnsureRule(utiliptables.Append, utiliptables.TableNAT, kubeServicesChain, args...); err != nil {
glog.Errorf("Failed to ensure that ipset %s chain %s jumps to %s: %v", proxier.lbIngressSet.Name, kubeServicesChain, KubeFireWallChain, err)
}
if !proxier.lbWhiteListCIDRSet.isEmpty() {
args = append(args[:0],
"-A", string(KubeFireWallChain),
"-m", "set", "--match-set", proxier.lbWhiteListCIDRSet.Name,
"dst,dst,src",
)
writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...)
}
if !proxier.lbWhiteListIPSet.isEmpty() {
args = append(args[:0],
"-A", string(KubeFireWallChain),
"-m", "set", "--match-set", proxier.lbWhiteListIPSet.Name,
"dst,dst,src",
)
writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...)
}
args = append(args[:0],
"-A", string(KubeFireWallChain),
)
// If the packet was able to reach the end of firewall chain, then it did not get DNATed.
// It means the packet cannot go thru the firewall, then mark it for DROP
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...)
}
if !proxier.nodePortSetTCP.isEmpty() {
// Build masquerade rules for packets which cross node visit nodeport.
args = append(args[:0],
"-A", string(kubeServicesChain),
"-m", "tcp", "-p", "tcp",
"-m", "set", "--match-set", proxier.nodePortSetTCP.Name,
"dst",
)
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
}
if !proxier.nodePortSetUDP.isEmpty() {
args = append(args[:0],
"-A", string(kubeServicesChain),
"-m", "udp", "-p", "udp",
"-m", "set", "--match-set", proxier.nodePortSetUDP.Name,
"dst",
)
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
}
// Write the end-of-table markers. // Write the end-of-table markers.
writeLine(proxier.natRules, "COMMIT") writeLine(proxier.natRules, "COMMIT")
@ -1411,7 +1671,6 @@ func (proxier *Proxier) cleanLegacyService(atciveServices map[string]bool, curre
} }
// linkKubeServiceChain will Create chain KUBE-SERVICES and link the chin in PREROUTING and OUTPUT // linkKubeServiceChain will Create chain KUBE-SERVICES and link the chin in PREROUTING and OUTPUT
// If not specify masqueradeAll or clusterCIDR or LB source range, won't create them.
// Chain PREROUTING (policy ACCEPT) // Chain PREROUTING (policy ACCEPT)
// target prot opt source destination // target prot opt source destination
@ -1451,6 +1710,55 @@ func (proxier *Proxier) linkKubeServiceChain(existingNATChains map[utiliptables.
return nil return nil
} }
//// linkKubeIPSetsChain will Create chain KUBE-SVC-IPSETS and link the chin in KUBE-SERVICES
//
//// Chain KUBE-SERVICES (policy ACCEPT)
//// target prot opt source destination
//// KUBE-SVC-IPSETS all -- 0.0.0.0/0 0.0.0.0/0 match-set KUBE-SERVICE-ACCESS dst,dst
//
//// Chain KUBE-SVC-IPSETS (1 references)
//// target prot opt source destination
//// KUBE-MARK-MASQ all -- 0.0.0.0/0 0.0.0.0/0 match-set KUBE-EXTERNAL-IP dst,dst
//// ACCEPT all -- 0.0.0.0/0 0.0.0.0/0 match-set KUBE-EXTERNAL-IP dst,dst PHYSDEV match ! --physdev-is-in ADDRTYPE match src-type !LOCAL
//// ACCEPT all -- 0.0.0.0/0 0.0.0.0/0 match-set KUBE-EXTERNAL-IP dst,dst ADDRTYPE match dst-type LOCAL
//// ...
//func (proxier *Proxier) linkKubeIPSetsChain(existingNATChains map[utiliptables.Chain]string, natChains *bytes.Buffer) error {
// if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, KubeServiceIPSetsChain); err != nil {
// return fmt.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, KubeServiceIPSetsChain, err)
// }
//
// // TODO: iptables comment message for ipset?
// // The hash:ip,port type of sets require two src/dst parameters of the set match and SET target kernel modules.
// args := []string{"-m", "set", "--match-set", proxier.kubeServiceAccessSet.Name, "dst,dst", "-j", string(KubeServiceIPSetsChain)}
// if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, utiliptables.TableNAT, kubeServicesChain, args...); err != nil {
// return fmt.Errorf("Failed to ensure that ipset %s chain %s jumps to %s: %v", proxier.kubeServiceAccessSet.Name, kubeServicesChain, KubeServiceIPSetsChain, err)
// }
//
// // equal to `iptables -t nat -N KUBE-SVC-IPSETS`
// // write `:KUBE-SERVICES - [0:0]` in nat table
// if chain, ok := existingNATChains[KubeServiceIPSetsChain]; ok {
// writeLine(natChains, chain)
// } else {
// writeLine(natChains, utiliptables.MakeChainLine(KubeServiceIPSetsChain))
// }
// return nil
//}
func (proxier *Proxier) createKubeFireWallChain(existingNATChains map[utiliptables.Chain]string, natChains *bytes.Buffer) error {
// `iptables -t nat -N KUBE-FIRE-WALL`
if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, KubeFireWallChain); err != nil {
return fmt.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, KubeFireWallChain, err)
}
// write `:KUBE-FIRE-WALL - [0:0]` in nat table
if chain, ok := existingNATChains[KubeFireWallChain]; ok {
writeLine(natChains, chain)
} else {
writeLine(natChains, utiliptables.MakeChainLine(KubeFireWallChain))
}
return nil
}
// Join all words with spaces, terminate with newline and write to buff. // Join all words with spaces, terminate with newline and write to buff.
func writeLine(buf *bytes.Buffer, words ...string) { func writeLine(buf *bytes.Buffer, words ...string) {
// We avoid strings.Join for performance reasons. // We avoid strings.Join for performance reasons.

View File

@ -33,6 +33,8 @@ import (
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
netlinktest "k8s.io/kubernetes/pkg/proxy/ipvs/testing" netlinktest "k8s.io/kubernetes/pkg/proxy/ipvs/testing"
proxyutil "k8s.io/kubernetes/pkg/proxy/util" proxyutil "k8s.io/kubernetes/pkg/proxy/util"
utilipset "k8s.io/kubernetes/pkg/util/ipset"
ipsettest "k8s.io/kubernetes/pkg/util/ipset/testing"
utiliptables "k8s.io/kubernetes/pkg/util/iptables" utiliptables "k8s.io/kubernetes/pkg/util/iptables"
iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing" iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing"
utilipvs "k8s.io/kubernetes/pkg/util/ipvs" utilipvs "k8s.io/kubernetes/pkg/util/ipvs"
@ -85,7 +87,7 @@ func (fake *fakeHealthChecker) SyncEndpoints(newEndpoints map[types.NamespacedNa
return nil return nil
} }
func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, nodeIPs []net.IP) *Proxier { func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset utilipset.Interface, nodeIPs []net.IP) *Proxier {
fcmd := fakeexec.FakeCmd{ fcmd := fakeexec.FakeCmd{
CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{ CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{
func() ([]byte, error) { return []byte("dummy device have been created"), nil }, func() ([]byte, error) { return []byte("dummy device have been created"), nil },
@ -98,24 +100,34 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, nodeIPs
LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
} }
return &Proxier{ return &Proxier{
exec: fexec, exec: fexec,
serviceMap: make(proxyServiceMap), serviceMap: make(proxyServiceMap),
serviceChanges: newServiceChangeMap(), serviceChanges: newServiceChangeMap(),
endpointsMap: make(proxyEndpointsMap), endpointsMap: make(proxyEndpointsMap),
endpointsChanges: newEndpointsChangeMap(testHostname), endpointsChanges: newEndpointsChangeMap(testHostname),
iptables: ipt, iptables: ipt,
ipvs: ipvs, ipvs: ipvs,
clusterCIDR: "10.0.0.0/24", ipset: ipset,
hostname: testHostname, clusterCIDR: "10.0.0.0/24",
portsMap: make(map[proxyutil.LocalPort]proxyutil.Closeable), hostname: testHostname,
portMapper: &fakePortOpener{[]*proxyutil.LocalPort{}}, portsMap: make(map[proxyutil.LocalPort]proxyutil.Closeable),
healthChecker: newFakeHealthChecker(), portMapper: &fakePortOpener{[]*proxyutil.LocalPort{}},
ipvsScheduler: DefaultScheduler, healthChecker: newFakeHealthChecker(),
ipGetter: &fakeIPGetter{nodeIPs: nodeIPs}, ipvsScheduler: DefaultScheduler,
iptablesData: bytes.NewBuffer(nil), ipGetter: &fakeIPGetter{nodeIPs: nodeIPs},
natChains: bytes.NewBuffer(nil), iptablesData: bytes.NewBuffer(nil),
natRules: bytes.NewBuffer(nil), natChains: bytes.NewBuffer(nil),
netlinkHandle: netlinktest.NewFakeNetlinkHandle(), natRules: bytes.NewBuffer(nil),
netlinkHandle: netlinktest.NewFakeNetlinkHandle(),
loopbackSet: NewIPSet(ipset, KubeLoopBackIPSet, utilipset.HashIPPortIP, false),
clusterIPSet: NewIPSet(ipset, KubeClusterIPSet, utilipset.HashIPPort, false),
externalIPSet: NewIPSet(ipset, KubeExternalIPSet, utilipset.HashIPPort, false),
lbIngressSet: NewIPSet(ipset, KubeLoadBalancerSet, utilipset.HashIPPort, false),
lbMasqSet: NewIPSet(ipset, KubeLoadBalancerMasqSet, utilipset.HashIPPort, false),
lbWhiteListIPSet: NewIPSet(ipset, KubeLoadBalancerSourceIPSet, utilipset.HashIPPortIP, false),
lbWhiteListCIDRSet: NewIPSet(ipset, KubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, false),
nodePortSetTCP: NewIPSet(ipset, KubeNodePortSetTCP, utilipset.BitmapPort, false),
nodePortSetUDP: NewIPSet(ipset, KubeNodePortSetUDP, utilipset.BitmapPort, false),
} }
} }
@ -171,10 +183,11 @@ func makeTestEndpoints(namespace, name string, eptFunc func(*api.Endpoints)) *ap
func TestNodePort(t *testing.T) { func TestNodePort(t *testing.T) {
ipt := iptablestest.NewFake() ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake() ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake()
nodeIPv4 := net.ParseIP("100.101.102.103") nodeIPv4 := net.ParseIP("100.101.102.103")
nodeIPv6 := net.ParseIP("2001:db8::1:1") nodeIPv6 := net.ParseIP("2001:db8::1:1")
nodeIPs := sets.NewString(nodeIPv4.String(), nodeIPv6.String()) nodeIPs := sets.NewString(nodeIPv4.String(), nodeIPv6.String())
fp := NewFakeProxier(ipt, ipvs, []net.IP{nodeIPv4, nodeIPv6}) fp := NewFakeProxier(ipt, ipvs, ipset, []net.IP{nodeIPv4, nodeIPv6})
svcIP := "10.20.30.41" svcIP := "10.20.30.41"
svcPort := 80 svcPort := 80
svcNodePort := 3001 svcNodePort := 3001
@ -248,8 +261,9 @@ func TestNodePort(t *testing.T) {
func TestNodePortNoEndpoint(t *testing.T) { func TestNodePortNoEndpoint(t *testing.T) {
ipt := iptablestest.NewFake() ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake() ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake()
nodeIP := net.ParseIP("100.101.102.103") nodeIP := net.ParseIP("100.101.102.103")
fp := NewFakeProxier(ipt, ipvs, []net.IP{nodeIP}) fp := NewFakeProxier(ipt, ipvs, ipset, []net.IP{nodeIP})
svcIP := "10.20.30.41" svcIP := "10.20.30.41"
svcPort := 80 svcPort := 80
svcNodePort := 3001 svcNodePort := 3001
@ -301,7 +315,8 @@ func TestNodePortNoEndpoint(t *testing.T) {
func TestClusterIPNoEndpoint(t *testing.T) { func TestClusterIPNoEndpoint(t *testing.T) {
ipt := iptablestest.NewFake() ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake() ipvs := ipvstest.NewFake()
fp := NewFakeProxier(ipt, ipvs, nil) ipset := ipsettest.NewFake()
fp := NewFakeProxier(ipt, ipvs, ipset, nil)
svcIP := "10.20.30.41" svcIP := "10.20.30.41"
svcPort := 80 svcPort := 80
svcPortName := proxy.ServicePortName{ svcPortName := proxy.ServicePortName{
@ -344,7 +359,8 @@ func TestClusterIPNoEndpoint(t *testing.T) {
func TestClusterIP(t *testing.T) { func TestClusterIP(t *testing.T) {
ipt := iptablestest.NewFake() ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake() ipvs := ipvstest.NewFake()
fp := NewFakeProxier(ipt, ipvs, nil) ipset := ipsettest.NewFake()
fp := NewFakeProxier(ipt, ipvs, ipset, nil)
svcIPv4 := "10.20.30.41" svcIPv4 := "10.20.30.41"
svcPortV4 := 80 svcPortV4 := 80
@ -450,7 +466,8 @@ func TestClusterIP(t *testing.T) {
func TestExternalIPsNoEndpoint(t *testing.T) { func TestExternalIPsNoEndpoint(t *testing.T) {
ipt := iptablestest.NewFake() ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake() ipvs := ipvstest.NewFake()
fp := NewFakeProxier(ipt, ipvs, nil) ipset := ipsettest.NewFake()
fp := NewFakeProxier(ipt, ipvs, ipset, nil)
svcIP := "10.20.30.41" svcIP := "10.20.30.41"
svcPort := 80 svcPort := 80
svcExternalIPs := "50.60.70.81" svcExternalIPs := "50.60.70.81"
@ -504,7 +521,8 @@ func TestExternalIPsNoEndpoint(t *testing.T) {
func TestExternalIPs(t *testing.T) { func TestExternalIPs(t *testing.T) {
ipt := iptablestest.NewFake() ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake() ipvs := ipvstest.NewFake()
fp := NewFakeProxier(ipt, ipvs, nil) ipset := ipsettest.NewFake()
fp := NewFakeProxier(ipt, ipvs, ipset, nil)
svcIP := "10.20.30.41" svcIP := "10.20.30.41"
svcPort := 80 svcPort := 80
svcExternalIPs := sets.NewString("50.60.70.81", "2012::51") svcExternalIPs := sets.NewString("50.60.70.81", "2012::51")
@ -573,7 +591,8 @@ func TestExternalIPs(t *testing.T) {
func TestLoadBalancer(t *testing.T) { func TestLoadBalancer(t *testing.T) {
ipt := iptablestest.NewFake() ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake() ipvs := ipvstest.NewFake()
fp := NewFakeProxier(ipt, ipvs, nil) ipset := ipsettest.NewFake()
fp := NewFakeProxier(ipt, ipvs, ipset, nil)
svcIP := "10.20.30.41" svcIP := "10.20.30.41"
svcPort := 80 svcPort := 80
svcNodePort := 3001 svcNodePort := 3001
@ -624,8 +643,9 @@ func strPtr(s string) *string {
func TestOnlyLocalNodePorts(t *testing.T) { func TestOnlyLocalNodePorts(t *testing.T) {
ipt := iptablestest.NewFake() ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake() ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake()
nodeIP := net.ParseIP("100.101.102.103") nodeIP := net.ParseIP("100.101.102.103")
fp := NewFakeProxier(ipt, ipvs, []net.IP{nodeIP}) fp := NewFakeProxier(ipt, ipvs, ipset, []net.IP{nodeIP})
svcIP := "10.20.30.41" svcIP := "10.20.30.41"
svcPort := 80 svcPort := 80
svcNodePort := 3001 svcNodePort := 3001
@ -705,7 +725,8 @@ func TestOnlyLocalNodePorts(t *testing.T) {
func TestOnlyLocalLoadBalancing(t *testing.T) { func TestOnlyLocalLoadBalancing(t *testing.T) {
ipt := iptablestest.NewFake() ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake() ipvs := ipvstest.NewFake()
fp := NewFakeProxier(ipt, ipvs, nil) ipset := ipsettest.NewFake()
fp := NewFakeProxier(ipt, ipvs, ipset, nil)
svcIP := "10.20.30.41" svcIP := "10.20.30.41"
svcPort := 80 svcPort := 80
svcNodePort := 3001 svcNodePort := 3001
@ -769,7 +790,8 @@ func addTestPort(array []api.ServicePort, name string, protocol api.Protocol, po
func TestBuildServiceMapAddRemove(t *testing.T) { func TestBuildServiceMapAddRemove(t *testing.T) {
ipt := iptablestest.NewFake() ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake() ipvs := ipvstest.NewFake()
fp := NewFakeProxier(ipt, ipvs, nil) ipset := ipsettest.NewFake()
fp := NewFakeProxier(ipt, ipvs, ipset, nil)
services := []*api.Service{ services := []*api.Service{
makeTestService("somewhere-else", "cluster-ip", func(svc *api.Service) { makeTestService("somewhere-else", "cluster-ip", func(svc *api.Service) {
@ -874,7 +896,8 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
func TestBuildServiceMapServiceHeadless(t *testing.T) { func TestBuildServiceMapServiceHeadless(t *testing.T) {
ipt := iptablestest.NewFake() ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake() ipvs := ipvstest.NewFake()
fp := NewFakeProxier(ipt, ipvs, nil) ipset := ipsettest.NewFake()
fp := NewFakeProxier(ipt, ipvs, ipset, nil)
makeServiceMap(fp, makeServiceMap(fp,
makeTestService("somewhere-else", "headless", func(svc *api.Service) { makeTestService("somewhere-else", "headless", func(svc *api.Service) {
@ -907,7 +930,8 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) {
func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
ipt := iptablestest.NewFake() ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake() ipvs := ipvstest.NewFake()
fp := NewFakeProxier(ipt, ipvs, nil) ipset := ipsettest.NewFake()
fp := NewFakeProxier(ipt, ipvs, ipset, nil)
makeServiceMap(fp, makeServiceMap(fp,
makeTestService("somewhere-else", "external-name", func(svc *api.Service) { makeTestService("somewhere-else", "external-name", func(svc *api.Service) {
@ -934,7 +958,8 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
func TestBuildServiceMapServiceUpdate(t *testing.T) { func TestBuildServiceMapServiceUpdate(t *testing.T) {
ipt := iptablestest.NewFake() ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake() ipvs := ipvstest.NewFake()
fp := NewFakeProxier(ipt, ipvs, nil) ipset := ipsettest.NewFake()
fp := NewFakeProxier(ipt, ipvs, ipset, nil)
servicev1 := makeTestService("somewhere", "some-service", func(svc *api.Service) { servicev1 := makeTestService("somewhere", "some-service", func(svc *api.Service) {
svc.Spec.Type = api.ServiceTypeClusterIP svc.Spec.Type = api.ServiceTypeClusterIP
@ -1016,8 +1041,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
func TestSessionAffinity(t *testing.T) { func TestSessionAffinity(t *testing.T) {
ipt := iptablestest.NewFake() ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake() ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake()
nodeIP := net.ParseIP("100.101.102.103") nodeIP := net.ParseIP("100.101.102.103")
fp := NewFakeProxier(ipt, ipvs, []net.IP{nodeIP}) fp := NewFakeProxier(ipt, ipvs, ipset, []net.IP{nodeIP})
svcIP := "10.20.30.41" svcIP := "10.20.30.41"
svcPort := 80 svcPort := 80
svcNodePort := 3001 svcNodePort := 3001
@ -1879,7 +1905,8 @@ func Test_updateEndpointsMap(t *testing.T) {
for tci, tc := range testCases { for tci, tc := range testCases {
ipt := iptablestest.NewFake() ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake() ipvs := ipvstest.NewFake()
fp := NewFakeProxier(ipt, ipvs, nil) ipset := ipsettest.NewFake()
fp := NewFakeProxier(ipt, ipvs, ipset, nil)
fp.hostname = nodeName fp.hostname = nodeName
// First check that after adding all previous versions of endpoints, // First check that after adding all previous versions of endpoints,

View File

@ -19,6 +19,7 @@ package util
import ( import (
"fmt" "fmt"
"net" "net"
"strconv"
"github.com/golang/glog" "github.com/golang/glog"
) )
@ -40,6 +41,21 @@ func IPPart(s string) string {
return ip return ip
} }
func PortPart(s string) (int, error) {
// Must be IP:port
_, port, err := net.SplitHostPort(s)
if err != nil {
glog.Errorf("Error parsing '%s': %v", s, err)
return -1, err
}
portNumber, err := strconv.Atoi(port)
if err != nil {
glog.Errorf("Error parsing '%s': %v", port, err)
return -1, err
}
return portNumber, nil
}
// ToCIDR returns a host address of the form <ip-address>/32 for // ToCIDR returns a host address of the form <ip-address>/32 for
// IPv4 and <ip-address>/128 for IPv6 // IPv4 and <ip-address>/128 for IPv6
func ToCIDR(ip net.IP) string { func ToCIDR(ip net.IP) string {