Merge pull request #20496 from matthewdupre/masquerade-config

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2016-02-08 10:49:20 -08:00
commit b32078d89b
15 changed files with 2690 additions and 2364 deletions

View File

@ -71,6 +71,7 @@ func (s *ProxyServerConfig) AddFlags(fs *pflag.FlagSet) {
fs.Var(componentconfig.PortRangeVar{&s.PortRange}, "proxy-port-range", "Range of host ports (beginPort-endPort, inclusive) that may be consumed in order to proxy service traffic. If unspecified (0-0) then ports will be randomly chosen.")
fs.StringVar(&s.HostnameOverride, "hostname-override", s.HostnameOverride, "If non-empty, will use this string as identification instead of the actual hostname.")
fs.Var(&s.Mode, "proxy-mode", "Which proxy mode to use: 'userspace' (older) or 'iptables' (faster). If blank, look at the Node object on the Kubernetes API and respect the '"+ExperimentalProxyModeAnnotation+"' annotation if provided. Otherwise use the best-available proxy (currently iptables). If the iptables proxy is selected, regardless of how, but the system's kernel or iptables versions are insufficient, this always falls back to the userspace proxy.")
fs.IntVar(s.IPTablesMasqueradeBit, "iptables-masquerade-bit", util.IntPtrDerefOr(s.IPTablesMasqueradeBit, 14), "If using the pure iptables proxy, the bit of the fwmark space to mark packets requiring SNAT with. Must be within the range [0, 31].")
fs.DurationVar(&s.IPTablesSyncPeriod.Duration, "iptables-sync-period", s.IPTablesSyncPeriod.Duration, "How often iptables rules are refreshed (e.g. '5s', '1m', '2h22m'). Must be greater than 0.")
fs.DurationVar(&s.ConfigSyncPeriod, "config-sync-period", s.ConfigSyncPeriod, "How often configuration from the apiserver is refreshed. Must be greater than 0.")
fs.BoolVar(&s.MasqueradeAll, "masquerade-all", s.MasqueradeAll, "If using the pure iptables proxy, SNAT everything")

View File

@ -191,18 +191,23 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
proxyMode := getProxyMode(string(config.Mode), client.Nodes(), hostname, iptInterface, iptables.LinuxKernelCompatTester{})
if proxyMode == proxyModeIptables {
glog.V(2).Info("Using iptables Proxier.")
proxierIptables, err := iptables.NewProxier(iptInterface, execer, config.IPTablesSyncPeriod.Duration, config.MasqueradeAll)
glog.V(0).Info("Using iptables Proxier.")
if config.IPTablesMasqueradeBit == nil {
// IPTablesMasqueradeBit must be specified or defaulted.
return nil, fmt.Errorf("Unable to read IPTablesMasqueradeBit from config")
}
proxierIptables, err := iptables.NewProxier(iptInterface, execer, config.IPTablesSyncPeriod.Duration, config.MasqueradeAll, *config.IPTablesMasqueradeBit)
if err != nil {
glog.Fatalf("Unable to create proxier: %v", err)
}
proxier = proxierIptables
endpointsHandler = proxierIptables
// No turning back. Remove artifacts that might still exist from the userspace Proxier.
glog.V(2).Info("Tearing down userspace rules. Errors here are acceptable.")
glog.V(0).Info("Tearing down userspace rules.")
userspace.CleanupLeftovers(iptInterface)
} else {
glog.V(2).Info("Using userspace Proxier.")
glog.V(0).Info("Using userspace Proxier.")
// This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for
// our config.EndpointsConfigHandler.
loadBalancer := userspace.NewLoadBalancerRR()
@ -222,7 +227,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
}
proxier = proxierUserspace
// Remove artifacts from the pure-iptables Proxier.
glog.V(2).Info("Tearing down pure-iptables proxy rules. Errors here are acceptable.")
glog.V(0).Info("Tearing down pure-iptables proxy rules.")
iptables.CleanupLeftovers(iptInterface)
}
iptInterface.AddReloadFunc(proxier.Sync)

View File

@ -63,6 +63,7 @@ kube-proxy
--healthz-bind-address=127.0.0.1: The IP address for the health check server to serve on, defaulting to 127.0.0.1 (set to 0.0.0.0 for all interfaces)
--healthz-port=10249: The port to bind the health check server. Use 0 to disable.
--hostname-override="": If non-empty, will use this string as identification instead of the actual hostname.
--iptables-masquerade-bit=14: If using the pure iptables proxy, the bit of the fwmark space to mark packets requiring SNAT with. Must be within the range [0, 31].
--iptables-sync-period=30s: How often iptables rules are refreshed (e.g. '5s', '1m', '2h22m'). Must be greater than 0.
--kube-api-burst=10: Burst to use while talking with kubernetes apiserver
--kube-api-qps=5: QPS to use while talking with kubernetes apiserver
@ -76,7 +77,7 @@ kube-proxy
--udp-timeout=250ms: How long an idle UDP connection will be kept open (e.g. '250ms', '2s'). Must be greater than 0. Only applicable for proxy-mode=userspace
```
###### Auto generated by spf13/cobra on 1-Feb-2016
###### Auto generated by spf13/cobra on 7-Feb-2016
<!-- BEGIN MUNGE: GENERATED_ANALYTICS -->

View File

@ -513,11 +513,11 @@ then look at the logs again.
```console
u@node$ iptables-save | grep hostnames
-A KUBE-SEP-57KPRZ3JQVENLNBR -s 10.244.3.6/32 -m comment --comment "default/hostnames:" -j MARK --set-xmark 0x4d415351/0xffffffff
-A KUBE-SEP-57KPRZ3JQVENLNBR -s 10.244.3.6/32 -m comment --comment "default/hostnames:" -j MARK --set-xmark 0x00004000/0x00004000
-A KUBE-SEP-57KPRZ3JQVENLNBR -p tcp -m comment --comment "default/hostnames:" -m tcp -j DNAT --to-destination 10.244.3.6:9376
-A KUBE-SEP-WNBA2IHDGP2BOBGZ -s 10.244.1.7/32 -m comment --comment "default/hostnames:" -j MARK --set-xmark 0x4d415351/0xffffffff
-A KUBE-SEP-WNBA2IHDGP2BOBGZ -s 10.244.1.7/32 -m comment --comment "default/hostnames:" -j MARK --set-xmark 0x00004000/0x00004000
-A KUBE-SEP-WNBA2IHDGP2BOBGZ -p tcp -m comment --comment "default/hostnames:" -m tcp -j DNAT --to-destination 10.244.1.7:9376
-A KUBE-SEP-X3P2623AGDH6CDF3 -s 10.244.2.3/32 -m comment --comment "default/hostnames:" -j MARK --set-xmark 0x4d415351/0xffffffff
-A KUBE-SEP-X3P2623AGDH6CDF3 -s 10.244.2.3/32 -m comment --comment "default/hostnames:" -j MARK --set-xmark 0x00004000/0x00004000
-A KUBE-SEP-X3P2623AGDH6CDF3 -p tcp -m comment --comment "default/hostnames:" -m tcp -j DNAT --to-destination 10.244.2.3:9376
-A KUBE-SERVICES -d 10.0.1.175/32 -p tcp -m comment --comment "default/hostnames: cluster IP" -m tcp --dport 80 -j KUBE-SVC-NWV5X2332I4OT4T3
-A KUBE-SVC-NWV5X2332I4OT4T3 -m comment --comment "default/hostnames:" -m statistic --mode random --probability 0.33332999982 -j KUBE-SEP-WNBA2IHDGP2BOBGZ

View File

@ -148,6 +148,7 @@ input-dirs
insecure-bind-address
insecure-port
insecure-skip-tls-verify
iptables-masquerade-bit
iptables-sync-period
ir-data-source
ir-dbname

File diff suppressed because it is too large Load Diff

View File

@ -31,6 +31,9 @@ type KubeProxyConfiguration struct {
HealthzPort int `json:"healthzPort"`
// hostnameOverride, if non-empty, will be used as the identity instead of the actual hostname.
HostnameOverride string `json:"hostnameOverride"`
// iptablesMasqueradeBit is the bit of the iptables fwmark space to use for SNAT if using
// the pure iptables proxy mode. Values must be within the range [0, 31].
IPTablesMasqueradeBit *int `json:"iptablesMasqueradeBit"`
// iptablesSyncPeriod is the period that iptables rules are refreshed (e.g. '5s', '1m',
// '2h22m'). Must be greater than 0.
IPTablesSyncPeriod unversioned.Duration `json:"iptablesSyncPeriodSeconds"`

View File

@ -37,6 +37,12 @@ func autoConvert_componentconfig_KubeProxyConfiguration_To_v1alpha1_KubeProxyCon
out.HealthzBindAddress = in.HealthzBindAddress
out.HealthzPort = int32(in.HealthzPort)
out.HostnameOverride = in.HostnameOverride
if in.IPTablesMasqueradeBit != nil {
out.IPTablesMasqueradeBit = new(int32)
*out.IPTablesMasqueradeBit = int32(*in.IPTablesMasqueradeBit)
} else {
out.IPTablesMasqueradeBit = nil
}
if err := s.Convert(&in.IPTablesSyncPeriod, &out.IPTablesSyncPeriod, 0); err != nil {
return err
}
@ -127,6 +133,12 @@ func autoConvert_v1alpha1_KubeProxyConfiguration_To_componentconfig_KubeProxyCon
out.HealthzBindAddress = in.HealthzBindAddress
out.HealthzPort = int(in.HealthzPort)
out.HostnameOverride = in.HostnameOverride
if in.IPTablesMasqueradeBit != nil {
out.IPTablesMasqueradeBit = new(int)
*out.IPTablesMasqueradeBit = int(*in.IPTablesMasqueradeBit)
} else {
out.IPTablesMasqueradeBit = nil
}
if err := s.Convert(&in.IPTablesSyncPeriod, &out.IPTablesSyncPeriod, 0); err != nil {
return err
}

View File

@ -43,6 +43,12 @@ func deepCopy_v1alpha1_KubeProxyConfiguration(in KubeProxyConfiguration, out *Ku
out.HealthzBindAddress = in.HealthzBindAddress
out.HealthzPort = in.HealthzPort
out.HostnameOverride = in.HostnameOverride
if in.IPTablesMasqueradeBit != nil {
out.IPTablesMasqueradeBit = new(int32)
*out.IPTablesMasqueradeBit = *in.IPTablesMasqueradeBit
} else {
out.IPTablesMasqueradeBit = nil
}
if err := deepCopy_unversioned_Duration(in.IPTablesSyncPeriod, &out.IPTablesSyncPeriod, c); err != nil {
return err
}

View File

@ -55,6 +55,10 @@ func addDefaultingFuncs(scheme *runtime.Scheme) {
if obj.ConntrackMax == 0 {
obj.ConntrackMax = 256 * 1024 // 4x default (64k)
}
if obj.IPTablesMasqueradeBit == nil {
temp := int32(14)
obj.IPTablesMasqueradeBit = &temp
}
if obj.ConntrackTCPEstablishedTimeout == zero {
obj.ConntrackTCPEstablishedTimeout = unversioned.Duration{Duration: 24 * time.Hour} // 1 day (1/5 default)
}

File diff suppressed because it is too large Load Diff

View File

@ -31,6 +31,9 @@ type KubeProxyConfiguration struct {
HealthzPort int32 `json:"healthzPort"`
// hostnameOverride, if non-empty, will be used as the identity instead of the actual hostname.
HostnameOverride string `json:"hostnameOverride"`
// iptablesMasqueradeBit is the bit of the iptables fwmark space to use for SNAT if using
// the pure iptables proxy mode. Values must be within the range [0, 31].
IPTablesMasqueradeBit *int32 `json:"iptablesMasqueradeBit"`
// iptablesSyncPeriod is the period that iptables rules are refreshed (e.g. '5s', '1m',
// '2h22m'). Must be greater than 0.
IPTablesSyncPeriod unversioned.Duration `json:"iptablesSyncPeriodSeconds"`

View File

@ -54,13 +54,20 @@ import (
const iptablesMinVersion = utiliptables.MinCheckVersion
// the services chain
const iptablesServicesChain utiliptables.Chain = "KUBE-SERVICES"
const kubeServicesChain utiliptables.Chain = "KUBE-SERVICES"
// the nodeports chain
const iptablesNodePortsChain utiliptables.Chain = "KUBE-NODEPORTS"
const kubeNodePortsChain utiliptables.Chain = "KUBE-NODEPORTS"
// the kubernetes postrouting chain
const kubePostroutingChain utiliptables.Chain = "KUBE-POSTROUTING"
// the mark-for-masquerade chain
const kubeMarkMasqChain utiliptables.Chain = "KUBE-MARK-MASQ"
// the mark we apply to traffic needing SNAT
const iptablesMasqueradeMark = "0x4d415351"
// TODO(thockin): Remove this for v1.3 or v1.4.
const oldIptablesMasqueradeMark = "0x4d415351"
// IptablesVersioner can query the current iptables version.
type IptablesVersioner interface {
@ -126,8 +133,7 @@ type serviceInfo struct {
loadBalancerStatus api.LoadBalancerStatus
sessionAffinityType api.ServiceAffinity
stickyMaxAgeSeconds int
// Deprecated, but required for back-compat (including e2e)
externalIPs []string
externalIPs []string
}
// returns a new serviceInfo struct
@ -149,9 +155,10 @@ type Proxier struct {
haveReceivedEndpointsUpdate bool // true once we've seen an OnEndpointsUpdate event
// These are effectively const and do not need the mutex to be held.
syncPeriod time.Duration
iptables utiliptables.Interface
masqueradeAll bool
syncPeriod time.Duration
iptables utiliptables.Interface
masqueradeAll bool
masqueradeMark string
}
type localPort struct {
@ -177,7 +184,7 @@ var _ proxy.ProxyProvider = &Proxier{}
// An error will be returned if iptables fails to update or acquire the initial lock.
// Once a proxier is created, it will keep iptables up to date in the background and
// will not terminate if a particular iptables call fails.
func NewProxier(ipt utiliptables.Interface, exec utilexec.Interface, syncPeriod time.Duration, masqueradeAll bool) (*Proxier, error) {
func NewProxier(ipt utiliptables.Interface, exec utilexec.Interface, syncPeriod time.Duration, masqueradeAll bool, masqueradeBit int) (*Proxier, error) {
// Set the route_localnet sysctl we need for
if err := utilsysctl.SetSysctl(sysctlRouteLocalnet, 1); err != nil {
return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlRouteLocalnet, err)
@ -191,50 +198,122 @@ func NewProxier(ipt utiliptables.Interface, exec utilexec.Interface, syncPeriod
glog.Warningf("can't set sysctl %s: %v", sysctlBridgeCallIptables, err)
}
// Generate the masquerade mark to use for SNAT rules.
if masqueradeBit < 0 || masqueradeBit > 31 {
return nil, fmt.Errorf("invalid iptables-masquerade-bit %v not in [0, 31]", masqueradeBit)
}
masqueradeValue := 1 << uint(masqueradeBit)
masqueradeMark := fmt.Sprintf("%#08x/%#08x", masqueradeValue, masqueradeValue)
return &Proxier{
serviceMap: make(map[proxy.ServicePortName]*serviceInfo),
endpointsMap: make(map[proxy.ServicePortName][]string),
portsMap: make(map[localPort]closeable),
syncPeriod: syncPeriod,
iptables: ipt,
masqueradeAll: masqueradeAll,
serviceMap: make(map[proxy.ServicePortName]*serviceInfo),
endpointsMap: make(map[proxy.ServicePortName][]string),
portsMap: make(map[localPort]closeable),
syncPeriod: syncPeriod,
iptables: ipt,
masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark,
}, nil
}
// CleanupLeftovers removes all iptables rules and chains created by the Proxier
// It returns true if an error was encountered. Errors are logged.
func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
//TODO: actually tear down all rules and chains.
args := []string{"-m", "comment", "--comment", "kubernetes service portals", "-j", string(iptablesServicesChain)}
if err := ipt.DeleteRule(utiliptables.TableNAT, utiliptables.ChainOutput, args...); err != nil {
glog.Errorf("Error removing pure-iptables proxy rule: %v", err)
encounteredError = true
// Unlink the services chain.
args := []string{
"-m", "comment", "--comment", "kubernetes service portals",
"-j", string(kubeServicesChain),
}
if err := ipt.DeleteRule(utiliptables.TableNAT, utiliptables.ChainPrerouting, args...); err != nil {
glog.Errorf("Error removing pure-iptables proxy rule: %v", err)
encounteredError = true
tableChainsWithJumpServices := []struct {
table utiliptables.Table
chain utiliptables.Chain
}{
{utiliptables.TableFilter, utiliptables.ChainOutput},
{utiliptables.TableNAT, utiliptables.ChainOutput},
{utiliptables.TableNAT, utiliptables.ChainPrerouting},
}
args = []string{"-m", "comment", "--comment", "kubernetes service traffic requiring SNAT", "-m", "mark", "--mark", iptablesMasqueradeMark, "-j", "MASQUERADE"}
if err := ipt.DeleteRule(utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil {
glog.Errorf("Error removing pure-iptables proxy rule: %v", err)
encounteredError = true
}
// flush and delete chains.
chains := []utiliptables.Chain{iptablesServicesChain, iptablesNodePortsChain}
for _, c := range chains {
// flush chain, then if sucessful delete, delete will fail if flush fails.
if err := ipt.FlushChain(utiliptables.TableNAT, c); err != nil {
glog.Errorf("Error flushing pure-iptables proxy chain: %v", err)
encounteredError = true
} else {
if err = ipt.DeleteChain(utiliptables.TableNAT, c); err != nil {
glog.Errorf("Error deleting pure-iptables proxy chain: %v", err)
for _, tc := range tableChainsWithJumpServices {
if err := ipt.DeleteRule(tc.table, tc.chain, args...); err != nil {
if !utiliptables.IsNotFoundError(err) {
glog.Errorf("Error removing pure-iptables proxy rule: %v", err)
encounteredError = true
}
}
}
// Unlink the postrouting chain.
args = []string{
"-m", "comment", "--comment", "kubernetes postrouting rules",
"-j", string(kubePostroutingChain),
}
if err := ipt.DeleteRule(utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil {
if !utiliptables.IsNotFoundError(err) {
glog.Errorf("Error removing pure-iptables proxy rule: %v", err)
encounteredError = true
}
}
// Flush and remove all of our chains.
if iptablesSaveRaw, err := ipt.Save(utiliptables.TableNAT); err != nil {
glog.Errorf("Failed to execute iptables-save for %s: %v", utiliptables.TableNAT, err)
encounteredError = true
} else {
existingNATChains := getChainLines(utiliptables.TableNAT, iptablesSaveRaw)
natChains := bytes.NewBuffer(nil)
natRules := bytes.NewBuffer(nil)
writeLine(natChains, "*nat")
// Start with chains we know we need to remove.
for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain, kubeMarkMasqChain} {
if _, found := existingNATChains[chain]; found {
chainString := string(chain)
writeLine(natChains, existingNATChains[chain]) // flush
writeLine(natRules, "-X", chainString) // delete
}
}
// Hunt for service and endpoint chains.
for chain := range existingNATChains {
chainString := string(chain)
if strings.HasPrefix(chainString, "KUBE-SVC-") || strings.HasPrefix(chainString, "KUBE-SEP-") {
writeLine(natChains, existingNATChains[chain]) // flush
writeLine(natRules, "-X", chainString) // delete
}
}
writeLine(natRules, "COMMIT")
natLines := append(natChains.Bytes(), natRules.Bytes()...)
// Write it.
err = ipt.Restore(utiliptables.TableNAT, natLines, utiliptables.NoFlushTables, utiliptables.RestoreCounters)
if err != nil {
glog.Errorf("Failed to execute iptables-restore for %s: %v", utiliptables.TableNAT, err)
encounteredError = true
}
}
{
filterBuf := bytes.NewBuffer(nil)
writeLine(filterBuf, "*filter")
writeLine(filterBuf, fmt.Sprintf(":%s - [0:0]", kubeServicesChain))
writeLine(filterBuf, fmt.Sprintf("-X %s", kubeServicesChain))
writeLine(filterBuf, "COMMIT")
// Write it.
if err := ipt.Restore(utiliptables.TableFilter, filterBuf.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters); err != nil {
glog.Errorf("Failed to execute iptables-restore for %s: %v", utiliptables.TableFilter, err)
encounteredError = true
}
}
// Clean up the older SNAT rule which was directly in POSTROUTING.
// TODO(thockin): Remove this for v1.3 or v1.4.
args = []string{
"-m", "comment", "--comment", "kubernetes service traffic requiring SNAT",
"-m", "mark", "--mark", oldIptablesMasqueradeMark,
"-j", "MASQUERADE",
}
if err := ipt.DeleteRule(utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil {
if !utiliptables.IsNotFoundError(err) {
glog.Errorf("Error removing old-style SNAT rule: %v", err)
encounteredError = true
}
}
return encounteredError
}
@ -481,37 +560,45 @@ func (proxier *Proxier) syncProxyRules() {
}
glog.V(3).Infof("Syncing iptables rules")
// Ensure main chains and rules are installed.
tablesNeedServicesChain := []utiliptables.Table{utiliptables.TableFilter, utiliptables.TableNAT}
for _, table := range tablesNeedServicesChain {
if _, err := proxier.iptables.EnsureChain(table, iptablesServicesChain); err != nil {
glog.Errorf("Failed to ensure that %s chain %s exists: %v", table, iptablesServicesChain, err)
return
}
}
// Link the services chain.
tableChainsNeedJumpServices := []struct {
table utiliptables.Table
chain utiliptables.Chain
}{
{utiliptables.TableFilter, utiliptables.ChainOutput},
{utiliptables.TableNAT, utiliptables.ChainOutput},
{utiliptables.TableNAT, utiliptables.ChainPrerouting},
}
comment := "kubernetes service portals"
args := []string{"-m", "comment", "--comment", comment, "-j", string(iptablesServicesChain)}
for _, tc := range tableChainsNeedJumpServices {
if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, tc.table, tc.chain, args...); err != nil {
glog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", tc.table, tc.chain, iptablesServicesChain, err)
return
}
}
// Link the output rules.
// Create and link the kube services chain.
{
comment := "kubernetes service traffic requiring SNAT"
args := []string{"-m", "comment", "--comment", comment, "-m", "mark", "--mark", iptablesMasqueradeMark, "-j", "MASQUERADE"}
if _, err := proxier.iptables.EnsureRule(utiliptables.Append, utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil {
glog.Errorf("Failed to ensure that chain %s obeys MASQUERADE mark: %v", utiliptables.ChainPostrouting, err)
tablesNeedServicesChain := []utiliptables.Table{utiliptables.TableFilter, utiliptables.TableNAT}
for _, table := range tablesNeedServicesChain {
if _, err := proxier.iptables.EnsureChain(table, kubeServicesChain); err != nil {
glog.Errorf("Failed to ensure that %s chain %s exists: %v", table, kubeServicesChain, err)
return
}
}
tableChainsNeedJumpServices := []struct {
table utiliptables.Table
chain utiliptables.Chain
}{
{utiliptables.TableFilter, utiliptables.ChainOutput},
{utiliptables.TableNAT, utiliptables.ChainOutput},
{utiliptables.TableNAT, utiliptables.ChainPrerouting},
}
comment := "kubernetes service portals"
args := []string{"-m", "comment", "--comment", comment, "-j", string(kubeServicesChain)}
for _, tc := range tableChainsNeedJumpServices {
if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, tc.table, tc.chain, args...); err != nil {
glog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", tc.table, tc.chain, kubeServicesChain, err)
return
}
}
}
// Create and link the kube postrouting chain.
{
if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, kubePostroutingChain); err != nil {
glog.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, kubePostroutingChain, err)
return
}
comment := "kubernetes postrouting rules"
args := []string{"-m", "comment", "--comment", comment, "-j", string(kubePostroutingChain)}
if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil {
glog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", utiliptables.TableNAT, utiliptables.ChainPostrouting, kubePostroutingChain, err)
return
}
}
@ -521,7 +608,7 @@ func (proxier *Proxier) syncProxyRules() {
existingFilterChains := make(map[utiliptables.Chain]string)
iptablesSaveRaw, err := proxier.iptables.Save(utiliptables.TableFilter)
if err != nil { // if we failed to get any rules
glog.Errorf("Failed to execute iptables-save, syncing all rules. %s", err.Error())
glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
} else { // otherwise parse the output
existingFilterChains = getChainLines(utiliptables.TableFilter, iptablesSaveRaw)
}
@ -529,7 +616,7 @@ func (proxier *Proxier) syncProxyRules() {
existingNATChains := make(map[utiliptables.Chain]string)
iptablesSaveRaw, err = proxier.iptables.Save(utiliptables.TableNAT)
if err != nil { // if we failed to get any rules
glog.Errorf("Failed to execute iptables-save, syncing all rules. %s", err.Error())
glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
} else { // otherwise parse the output
existingNATChains = getChainLines(utiliptables.TableNAT, iptablesSaveRaw)
}
@ -544,24 +631,52 @@ func (proxier *Proxier) syncProxyRules() {
writeLine(natChains, "*nat")
// Make sure we keep stats for the top-level chains, if they existed
// (which they should have because we created them above).
if chain, ok := existingFilterChains[iptablesServicesChain]; ok {
// (which most should have because we created them above).
if chain, ok := existingFilterChains[kubeServicesChain]; ok {
writeLine(filterChains, chain)
} else {
writeLine(filterChains, makeChainLine(iptablesServicesChain))
writeLine(filterChains, makeChainLine(kubeServicesChain))
}
if chain, ok := existingNATChains[iptablesServicesChain]; ok {
if chain, ok := existingNATChains[kubeServicesChain]; ok {
writeLine(natChains, chain)
} else {
writeLine(natChains, makeChainLine(iptablesServicesChain))
writeLine(natChains, makeChainLine(kubeServicesChain))
}
if chain, ok := existingNATChains[iptablesNodePortsChain]; ok {
if chain, ok := existingNATChains[kubeNodePortsChain]; ok {
writeLine(natChains, chain)
} else {
writeLine(natChains, makeChainLine(iptablesNodePortsChain))
writeLine(natChains, makeChainLine(kubeNodePortsChain))
}
if chain, ok := existingNATChains[kubePostroutingChain]; ok {
writeLine(natChains, chain)
} else {
writeLine(natChains, makeChainLine(kubePostroutingChain))
}
if chain, ok := existingNATChains[kubeMarkMasqChain]; ok {
writeLine(natChains, chain)
} else {
writeLine(natChains, makeChainLine(kubeMarkMasqChain))
}
// Accumulate nat chains to keep.
// Install the kubernetes-specific postrouting rules. We use a whole chain for
// this so that it is easier to flush and change, for example if the mark
// value should ever change.
writeLine(natRules, []string{
"-A", string(kubePostroutingChain),
"-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`,
"-m", "mark", "--mark", proxier.masqueradeMark,
"-j", "MASQUERADE",
}...)
// Install the kubernetes-specific masquerade mark rule. We use a whole chain for
// this so that it is easier to flush and change, for example if the mark
// value should ever change.
writeLine(natRules, []string{
"-A", string(kubeMarkMasqChain),
"-j", "MARK", "--set-xmark", proxier.masqueradeMark,
}...)
// Accumulate NAT chains to keep.
activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set
// Accumulate new local ports that we have opened.
@ -582,18 +697,16 @@ func (proxier *Proxier) syncProxyRules() {
// Capture the clusterIP.
args := []string{
"-A", string(iptablesServicesChain),
"-m", "comment", "--comment", fmt.Sprintf("\"%s cluster IP\"", svcName.String()),
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcName.String()),
"-m", protocol, "-p", protocol,
"-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()),
"--dport", fmt.Sprintf("%d", svcInfo.port),
}
if proxier.masqueradeAll {
writeLine(natRules, append(args,
"-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))...)
writeLine(natRules, append(args, "-j", string(kubeMarkMasqChain))...)
}
writeLine(natRules, append(args,
"-j", string(svcChain))...)
writeLine(natRules, append(args, "-j", string(svcChain))...)
// Capture externalIPs.
for _, externalIP := range svcInfo.externalIPs {
@ -621,15 +734,14 @@ func (proxier *Proxier) syncProxyRules() {
}
} // We're holding the port, so it's OK to install iptables rules.
args := []string{
"-A", string(iptablesServicesChain),
"-m", "comment", "--comment", fmt.Sprintf("\"%s external IP\"", svcName.String()),
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcName.String()),
"-m", protocol, "-p", protocol,
"-d", fmt.Sprintf("%s/32", externalIP),
"--dport", fmt.Sprintf("%d", svcInfo.port),
}
// We have to SNAT packets to external IPs.
writeLine(natRules, append(args,
"-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))...)
writeLine(natRules, append(args, "-j", string(kubeMarkMasqChain))...)
// Allow traffic for external IPs that does not come from a bridge (i.e. not from a container)
// nor from a local process to be forwarded to the service.
@ -638,30 +750,26 @@ func (proxier *Proxier) syncProxyRules() {
externalTrafficOnlyArgs := append(args,
"-m", "physdev", "!", "--physdev-is-in",
"-m", "addrtype", "!", "--src-type", "LOCAL")
writeLine(natRules, append(externalTrafficOnlyArgs,
"-j", string(svcChain))...)
writeLine(natRules, append(externalTrafficOnlyArgs, "-j", string(svcChain))...)
dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL")
// Allow traffic bound for external IPs that happen to be recognized as local IPs to stay local.
// This covers cases like GCE load-balancers which get added to the local routing table.
writeLine(natRules, append(dstLocalOnlyArgs,
"-j", string(svcChain))...)
writeLine(natRules, append(dstLocalOnlyArgs, "-j", string(svcChain))...)
}
// Capture load-balancer ingress.
for _, ingress := range svcInfo.loadBalancerStatus.Ingress {
if ingress.IP != "" {
args := []string{
"-A", string(iptablesServicesChain),
"-m", "comment", "--comment", fmt.Sprintf("\"%s loadbalancer IP\"", svcName.String()),
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcName.String()),
"-m", protocol, "-p", protocol,
"-d", fmt.Sprintf("%s/32", ingress.IP),
"--dport", fmt.Sprintf("%d", svcInfo.port),
}
// We have to SNAT packets from external IPs.
writeLine(natRules, append(args,
"-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))...)
writeLine(natRules, append(args,
"-j", string(svcChain))...)
writeLine(natRules, append(args, "-j", string(kubeMarkMasqChain))...)
writeLine(natRules, append(args, "-j", string(svcChain))...)
}
}
@ -687,27 +795,24 @@ func (proxier *Proxier) syncProxyRules() {
}
newLocalPorts[lp] = socket
} // We're holding the port, so it's OK to install iptables rules.
args := []string{
"-A", string(kubeNodePortsChain),
"-m", "comment", "--comment", svcName.String(),
"-m", protocol, "-p", protocol,
"--dport", fmt.Sprintf("%d", svcInfo.nodePort),
}
// Nodeports need SNAT.
writeLine(natRules,
"-A", string(iptablesNodePortsChain),
"-m", "comment", "--comment", svcName.String(),
"-m", protocol, "-p", protocol,
"--dport", fmt.Sprintf("%d", svcInfo.nodePort),
"-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))
writeLine(natRules, append(args, "-j", string(kubeMarkMasqChain))...)
// Jump to the service chain.
writeLine(natRules,
"-A", string(iptablesNodePortsChain),
"-m", "comment", "--comment", svcName.String(),
"-m", protocol, "-p", protocol,
"--dport", fmt.Sprintf("%d", svcInfo.nodePort),
"-j", string(svcChain))
writeLine(natRules, append(args, "-j", string(svcChain))...)
}
// If the service has no endpoints then reject packets.
if len(proxier.endpointsMap[svcName]) == 0 {
writeLine(filterRules,
"-A", string(iptablesServicesChain),
"-m", "comment", "--comment", fmt.Sprintf("\"%s has no endpoints\"", svcName.String()),
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcName.String()),
"-m", protocol, "-p", protocol,
"-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()),
"--dport", fmt.Sprintf("%d", svcInfo.port),
@ -777,16 +882,14 @@ func (proxier *Proxier) syncProxyRules() {
// TODO: if we grow logic to get this node's pod CIDR, we can use it.
writeLine(natRules, append(args,
"-s", fmt.Sprintf("%s/32", strings.Split(endpoints[i], ":")[0]),
"-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))...)
"-j", string(kubeMarkMasqChain))...)
// Update client-affinity lists.
if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP {
args = append(args, "-m", "recent", "--name", string(endpointChain), "--set")
}
// DNAT to final destination.
args = append(args,
"-m", protocol, "-p", protocol,
"-j", "DNAT", "--to-destination", endpoints[i])
args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", endpoints[i])
writeLine(natRules, args...)
}
}
@ -810,10 +913,10 @@ func (proxier *Proxier) syncProxyRules() {
// Finally, tail-call to the nodeports chain. This needs to be after all
// other service portal rules.
writeLine(natRules,
"-A", string(iptablesServicesChain),
"-m", "comment", "--comment", "\"kubernetes service nodeports; NOTE: this must be the last rule in this chain\"",
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`,
"-m", "addrtype", "--dst-type", "LOCAL",
"-j", string(iptablesNodePortsChain))
"-j", string(kubeNodePortsChain))
// Write the end-of-table markers.
writeLine(filterRules, "COMMIT")
@ -825,23 +928,37 @@ func (proxier *Proxier) syncProxyRules() {
natLines := append(natChains.Bytes(), natRules.Bytes()...)
lines := append(filterLines, natLines...)
glog.V(3).Infof("Syncing iptables rules: %s", lines)
glog.V(3).Infof("Restoring iptables rules: %s", lines)
err = proxier.iptables.RestoreAll(lines, utiliptables.NoFlushTables, utiliptables.RestoreCounters)
if err != nil {
glog.Errorf("Failed to sync iptables rules: %v", err)
glog.Errorf("Failed to execute iptables-restore: %v", err)
// Revert new local ports.
for k, v := range newLocalPorts {
glog.Errorf("Closing local port %s", k.String())
v.Close()
}
} else {
// Close old local ports and save new ones.
for k, v := range proxier.portsMap {
if newLocalPorts[k] == nil {
v.Close()
}
return
}
// Close old local ports and save new ones.
for k, v := range proxier.portsMap {
if newLocalPorts[k] == nil {
v.Close()
}
}
proxier.portsMap = newLocalPorts
// Clean up the older SNAT rule which was directly in POSTROUTING.
// TODO(thockin): Remove this for v1.3 or v1.4.
args := []string{
"-m", "comment", "--comment", "kubernetes service traffic requiring SNAT",
"-m", "mark", "--mark", oldIptablesMasqueradeMark,
"-j", "MASQUERADE",
}
if err := proxier.iptables.DeleteRule(utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil {
if !utiliptables.IsNotFoundError(err) {
glog.Errorf("Error removing old-style SNAT rule: %v", err)
}
proxier.portsMap = newLocalPorts
}
}

View File

@ -196,27 +196,37 @@ func CleanupLeftovers(ipt iptables.Interface) (encounteredError bool) {
// Delete Rules first, then Flush and Delete Chains
args := []string{"-m", "comment", "--comment", "handle ClusterIPs; NOTE: this must be before the NodePort rules"}
if err := ipt.DeleteRule(iptables.TableNAT, iptables.ChainOutput, append(args, "-j", string(iptablesHostPortalChain))...); err != nil {
glog.Errorf("Error removing userspace rule: %v", err)
encounteredError = true
if !iptables.IsNotFoundError(err) {
glog.Errorf("Error removing userspace rule: %v", err)
encounteredError = true
}
}
if err := ipt.DeleteRule(iptables.TableNAT, iptables.ChainPrerouting, append(args, "-j", string(iptablesContainerPortalChain))...); err != nil {
glog.Errorf("Error removing userspace rule: %v", err)
encounteredError = true
if !iptables.IsNotFoundError(err) {
glog.Errorf("Error removing userspace rule: %v", err)
encounteredError = true
}
}
args = []string{"-m", "addrtype", "--dst-type", "LOCAL"}
args = append(args, "-m", "comment", "--comment", "handle service NodePorts; NOTE: this must be the last rule in the chain")
if err := ipt.DeleteRule(iptables.TableNAT, iptables.ChainOutput, append(args, "-j", string(iptablesHostNodePortChain))...); err != nil {
glog.Errorf("Error removing userspace rule: %v", err)
encounteredError = true
if !iptables.IsNotFoundError(err) {
glog.Errorf("Error removing userspace rule: %v", err)
encounteredError = true
}
}
if err := ipt.DeleteRule(iptables.TableNAT, iptables.ChainPrerouting, append(args, "-j", string(iptablesContainerNodePortChain))...); err != nil {
glog.Errorf("Error removing userspace rule: %v", err)
encounteredError = true
if !iptables.IsNotFoundError(err) {
glog.Errorf("Error removing userspace rule: %v", err)
encounteredError = true
}
}
args = []string{"-m", "comment", "--comment", "Ensure that non-local NodePort traffic can flow"}
if err := ipt.DeleteRule(iptables.TableFilter, iptables.ChainInput, append(args, "-j", string(iptablesNonLocalNodePortChain))...); err != nil {
glog.Errorf("Error removing userspace rule: %v", err)
encounteredError = true
if !iptables.IsNotFoundError(err) {
glog.Errorf("Error removing userspace rule: %v", err)
encounteredError = true
}
}
// flush and delete chains.
@ -228,12 +238,16 @@ func CleanupLeftovers(ipt iptables.Interface) (encounteredError bool) {
for _, c := range chains {
// flush chain, then if successful delete, delete will fail if flush fails.
if err := ipt.FlushChain(table, c); err != nil {
glog.Errorf("Error flushing userspace chain: %v", err)
encounteredError = true
if !iptables.IsNotFoundError(err) {
glog.Errorf("Error flushing userspace chain: %v", err)
encounteredError = true
}
} else {
if err = ipt.DeleteChain(table, c); err != nil {
glog.Errorf("Error deleting userspace chain: %v", err)
encounteredError = true
if !iptables.IsNotFoundError(err) {
glog.Errorf("Error deleting userspace chain: %v", err)
encounteredError = true
}
}
}
}

View File

@ -296,6 +296,7 @@ func (runner *runner) Save(table Table) ([]byte, error) {
// run and return
args := []string{"-t", string(table)}
glog.V(4).Infof("running iptables-save %v", args)
return runner.exec.Command(cmdIptablesSave, args...).CombinedOutput()
}
@ -305,6 +306,7 @@ func (runner *runner) SaveAll() ([]byte, error) {
defer runner.mu.Unlock()
// run and return
glog.V(4).Infof("running iptables-save")
return runner.exec.Command(cmdIptablesSave, []string{}...).CombinedOutput()
}
@ -354,6 +356,7 @@ func (runner *runner) restoreInternal(args []string, data []byte, flush FlushFla
return err
}
// run the command and return the output or an error including the output and error
glog.V(4).Infof("running iptables-restore %v", args)
b, err := runner.exec.Command(cmdIptablesRestore, args...).CombinedOutput()
if err != nil {
return fmt.Errorf("%v (%s)", err, b)
@ -576,3 +579,17 @@ func (runner *runner) reload() {
f()
}
}
// IsNotFoundError returns true if the error indicates "not found". It parses
// the error string looking for known values, which is imperfect but works in
// practice.
func IsNotFoundError(err error) bool {
es := err.Error()
if strings.Contains(es, "No such file or directory") {
return true
}
if strings.Contains(es, "No chain/target/match by that name") {
return true
}
return false
}