Merge pull request #12764 from thockin/proxy-nodeports

More corner cases with iptables proxy
This commit is contained in:
Robert Bailey 2015-08-18 09:13:20 -07:00
commit c45747bfad
5 changed files with 336 additions and 257 deletions

View File

@ -159,7 +159,8 @@ func (s *ProxyServer) Run(_ []string) error {
if !s.ForceUserspaceProxy && shouldUseIptables { if !s.ForceUserspaceProxy && shouldUseIptables {
glog.V(2).Info("Using iptables Proxier.") glog.V(2).Info("Using iptables Proxier.")
proxierIptables, err := iptables.NewProxier(utiliptables.New(exec.New(), protocol), s.SyncPeriod) execer := exec.New()
proxierIptables, err := iptables.NewProxier(utiliptables.New(execer, protocol), execer, s.SyncPeriod)
if err != nil { if err != nil {
glog.Fatalf("Unable to create proxier: %v", err) glog.Fatalf("Unable to create proxier: %v", err)
} }

View File

@ -19,6 +19,7 @@ package config
import ( import (
"sync" "sync"
"github.com/davecgh/go-spew/spew"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
@ -91,6 +92,7 @@ func NewEndpointsConfig() *EndpointsConfig {
func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) { func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) {
c.bcaster.Add(config.ListenerFunc(func(instance interface{}) { c.bcaster.Add(config.ListenerFunc(func(instance interface{}) {
glog.V(3).Infof("Calling handler.OnEndpointsUpdate()")
handler.OnEndpointsUpdate(instance.([]api.Endpoints)) handler.OnEndpointsUpdate(instance.([]api.Endpoints))
})) }))
} }
@ -126,19 +128,19 @@ func (s *endpointsStore) Merge(source string, change interface{}) error {
update := change.(EndpointsUpdate) update := change.(EndpointsUpdate)
switch update.Op { switch update.Op {
case ADD: case ADD:
glog.V(4).Infof("Adding new endpoint from source %s : %+v", source, update.Endpoints) glog.V(4).Infof("Adding new endpoint from source %s : %s", source, spew.Sdump(update.Endpoints))
for _, value := range update.Endpoints { for _, value := range update.Endpoints {
name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name} name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name}
endpoints[name] = value endpoints[name] = value
} }
case REMOVE: case REMOVE:
glog.V(4).Infof("Removing an endpoint %+v", update) glog.V(4).Infof("Removing an endpoint %s", spew.Sdump(update))
for _, value := range update.Endpoints { for _, value := range update.Endpoints {
name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name} name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name}
delete(endpoints, name) delete(endpoints, name)
} }
case SET: case SET:
glog.V(4).Infof("Setting endpoints %+v", update) glog.V(4).Infof("Setting endpoints %s", spew.Sdump(update))
// Clear the old map entries by just creating a new map // Clear the old map entries by just creating a new map
endpoints = make(map[types.NamespacedName]api.Endpoints) endpoints = make(map[types.NamespacedName]api.Endpoints)
for _, value := range update.Endpoints { for _, value := range update.Endpoints {
@ -146,7 +148,7 @@ func (s *endpointsStore) Merge(source string, change interface{}) error {
endpoints[name] = value endpoints[name] = value
} }
default: default:
glog.V(4).Infof("Received invalid update type: %v", update) glog.V(4).Infof("Received invalid update type: %s", spew.Sdump(update))
} }
s.endpoints[source] = endpoints s.endpoints[source] = endpoints
s.endpointLock.Unlock() s.endpointLock.Unlock()
@ -189,6 +191,7 @@ func NewServiceConfig() *ServiceConfig {
func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) { func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) {
c.bcaster.Add(config.ListenerFunc(func(instance interface{}) { c.bcaster.Add(config.ListenerFunc(func(instance interface{}) {
glog.V(3).Infof("Calling handler.OnServiceUpdate()")
handler.OnServiceUpdate(instance.([]api.Service)) handler.OnServiceUpdate(instance.([]api.Service))
})) }))
} }
@ -224,19 +227,19 @@ func (s *serviceStore) Merge(source string, change interface{}) error {
update := change.(ServiceUpdate) update := change.(ServiceUpdate)
switch update.Op { switch update.Op {
case ADD: case ADD:
glog.V(4).Infof("Adding new service from source %s : %+v", source, update.Services) glog.V(4).Infof("Adding new service from source %s : %s", source, spew.Sdump(update.Services))
for _, value := range update.Services { for _, value := range update.Services {
name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name} name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name}
services[name] = value services[name] = value
} }
case REMOVE: case REMOVE:
glog.V(4).Infof("Removing a service %+v", update) glog.V(4).Infof("Removing a service %s", spew.Sdump(update))
for _, value := range update.Services { for _, value := range update.Services {
name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name} name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name}
delete(services, name) delete(services, name)
} }
case SET: case SET:
glog.V(4).Infof("Setting services %+v", update) glog.V(4).Infof("Setting services %s", spew.Sdump(update))
// Clear the old map entries by just creating a new map // Clear the old map entries by just creating a new map
services = make(map[types.NamespacedName]api.Service) services = make(map[types.NamespacedName]api.Service)
for _, value := range update.Services { for _, value := range update.Services {
@ -244,7 +247,7 @@ func (s *serviceStore) Merge(source string, change interface{}) error {
services[name] = value services[name] = value
} }
default: default:
glog.V(4).Infof("Received invalid update type: %v", update) glog.V(4).Infof("Received invalid update type: %s", spew.Sdump(update))
} }
s.services[source] = services s.services[source] = services
s.serviceLock.Unlock() s.serviceLock.Unlock()

View File

@ -16,16 +16,18 @@ limitations under the License.
package iptables package iptables
/* //
NOTE: this needs to be tested in e2e since it uses iptables for everything. // NOTE: this needs to be tested in e2e since it uses iptables for everything.
*/ //
import ( import (
"bytes" "bytes"
"crypto/sha256" "crypto/sha256"
"encoding/base32" "encoding/base32"
"fmt" "fmt"
"io/ioutil"
"net" "net"
"path"
"reflect" "reflect"
"strconv" "strconv"
"strings" "strings"
@ -33,6 +35,7 @@ import (
"time" "time"
"github.com/coreos/go-semver/semver" "github.com/coreos/go-semver/semver"
"github.com/davecgh/go-spew/spew"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy"
@ -42,26 +45,32 @@ import (
"k8s.io/kubernetes/pkg/util/slice" "k8s.io/kubernetes/pkg/util/slice"
) )
// NOTE: IPTABLES_MIN_VERSION is the minimum version of iptables for which we will use the Proxier // iptablesMinVersion is the minimum version of iptables for which we will use the Proxier
// from this package instead of the userspace Proxier. // from this package instead of the userspace Proxier. While most of the
// This is will not be enough, as the version number is somewhat unreliable, // features we need were available earlier, the '-C' flag was added more
// features are backported in various distros and this could get pretty hairy. // recently. We use that indirectly in Ensure* functions, and if we don't
// However iptables-1.4.0 was released 2007-Dec-22 and appears to have every feature we use, // have it, we have to be extra careful about the exact args we feed in being
// so this seems prefectly reasonable for now. // the same as the args we read back (iptables itself normalizes some args).
const ( // This is the "new" Proxier, so we require "new" versions of tools.
IPTABLES_MIN_VERSION string = "1.4.0" const iptablesMinVersion = utiliptables.MinCheckVersion
)
// the services chain // the services chain
var iptablesServicesChain utiliptables.Chain = "KUBE-SERVICES" const iptablesServicesChain utiliptables.Chain = "KUBE-SERVICES"
// ShouldUseIptablesProxier returns true if we should use the iptables Proxier instead of // the nodeports chain
// the userspace Proxier. const iptablesNodePortsChain utiliptables.Chain = "KUBE-NODEPORTS"
// This is determined by the iptables version. It may return an erorr if it fails to get the
// itpables version without error, in which case it will also return false. // the mark we apply to traffic needing SNAT
const iptablesMasqueradeMark = "0x4d415351"
// ShouldUseIptablesProxier returns true if we should use the iptables Proxier
// instead of the "classic" userspace Proxier. This is determined by checking
// the iptables version and for the existence of kernel features. It may return
// an error if it fails to get the itpables version without error, in which
// case it will also return false.
func ShouldUseIptablesProxier() (bool, error) { func ShouldUseIptablesProxier() (bool, error) {
exec := utilexec.New() exec := utilexec.New()
minVersion, err := semver.NewVersion(IPTABLES_MIN_VERSION) minVersion, err := semver.NewVersion(iptablesMinVersion)
if err != nil { if err != nil {
return false, err return false, err
} }
@ -75,21 +84,50 @@ func ShouldUseIptablesProxier() (bool, error) {
if err != nil { if err != nil {
return false, err return false, err
} }
return !version.LessThan(*minVersion), nil if version.LessThan(*minVersion) {
return false, nil
}
// Check for the required sysctls. We don't care about the value, just
// that it exists. If this Proxier is chosen, we'll iniialize it as we
// need.
_, err = getSysctl(sysctlRouteLocalnet)
if err != nil {
return false, err
}
return true, nil
} }
type portal struct { const sysctlBase = "/proc/sys"
ip net.IP const sysctlRouteLocalnet = "net/ipv4/conf/all/route_localnet"
port int const sysctlBridgeCallIptables = "net/bridge/bridge-nf-call-iptables"
protocol api.Protocol
func getSysctl(sysctl string) (int, error) {
data, err := ioutil.ReadFile(path.Join(sysctlBase, sysctl))
if err != nil {
return -1, err
}
val, err := strconv.Atoi(strings.Trim(string(data), " \n"))
if err != nil {
return -1, err
}
return val, nil
}
func setSysctl(sysctl string, newVal int) error {
return ioutil.WriteFile(path.Join(sysctlBase, sysctl), []byte(strconv.Itoa(newVal)), 0640)
} }
// internal struct for string service information // internal struct for string service information
type serviceInfo struct { type serviceInfo struct {
portal portal clusterIP net.IP
port int
protocol api.Protocol
nodePort int
loadBalancerStatus api.LoadBalancerStatus loadBalancerStatus api.LoadBalancerStatus
sessionAffinityType api.ServiceAffinity sessionAffinityType api.ServiceAffinity
stickyMaxAgeMinutes int stickyMaxAgeSeconds int
endpoints []string endpoints []string
// Deprecated, but required for back-compat (including e2e) // Deprecated, but required for back-compat (including e2e)
deprecatedPublicIPs []string deprecatedPublicIPs []string
@ -99,7 +137,7 @@ type serviceInfo struct {
func newServiceInfo(service proxy.ServicePortName) *serviceInfo { func newServiceInfo(service proxy.ServicePortName) *serviceInfo {
return &serviceInfo{ return &serviceInfo{
sessionAffinityType: api.ServiceAffinityNone, // default sessionAffinityType: api.ServiceAffinityNone, // default
stickyMaxAgeMinutes: 180, // TODO: paramaterize this in the API. stickyMaxAgeSeconds: 180, // TODO: paramaterize this in the API.
} }
} }
@ -122,10 +160,25 @@ var _ proxy.ProxyProvider = &Proxier{}
// An error will be returned if iptables fails to update or acquire the initial lock. // An error will be returned if iptables fails to update or acquire the initial lock.
// Once a proxier is created, it will keep iptables up to date in the background and // Once a proxier is created, it will keep iptables up to date in the background and
// will not terminate if a particular iptables call fails. // will not terminate if a particular iptables call fails.
func NewProxier(ipt utiliptables.Interface, syncPeriod time.Duration) (*Proxier, error) { func NewProxier(ipt utiliptables.Interface, exec utilexec.Interface, syncPeriod time.Duration) (*Proxier, error) {
// Set the route_localnet sysctl we need for
if err := setSysctl(sysctlRouteLocalnet, 1); err != nil {
return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlRouteLocalnet, err)
}
// Load the module. It's OK if this fails (e.g. the module is not present)
// because we'll catch the error on the sysctl, which is what we actually
// care about.
exec.Command("modprobe", "br-netfilter").CombinedOutput()
if err := setSysctl(sysctlBridgeCallIptables, 1); err != nil {
return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlBridgeCallIptables, err)
}
// No turning back. Remove artifacts that might still exist from the userspace Proxier.
glog.V(2).Info("Tearing down userspace rules. Errors here are acceptable.") glog.V(2).Info("Tearing down userspace rules. Errors here are acceptable.")
// remove iptables rules/chains from the userspace Proxier
tearDownUserspaceIptables(ipt) tearDownUserspaceIptables(ipt)
return &Proxier{ return &Proxier{
serviceMap: make(map[proxy.ServicePortName]*serviceInfo), serviceMap: make(map[proxy.ServicePortName]*serviceInfo),
syncPeriod: syncPeriod, syncPeriod: syncPeriod,
@ -177,10 +230,10 @@ func tearDownUserspaceIptables(ipt utiliptables.Interface) {
} }
func (proxier *Proxier) sameConfig(info *serviceInfo, service *api.Service, port *api.ServicePort) bool { func (proxier *Proxier) sameConfig(info *serviceInfo, service *api.Service, port *api.ServicePort) bool {
if info.portal.protocol != port.Protocol || info.portal.port != port.Port { if info.protocol != port.Protocol || info.port != port.Port || info.nodePort != port.NodePort {
return false return false
} }
if !info.portal.ip.Equal(net.ParseIP(service.Spec.ClusterIP)) { if !info.clusterIP.Equal(net.ParseIP(service.Spec.ClusterIP)) {
return false return false
} }
if !ipsEqual(info.deprecatedPublicIPs, service.Spec.DeprecatedPublicIPs) { if !ipsEqual(info.deprecatedPublicIPs, service.Spec.DeprecatedPublicIPs) {
@ -231,7 +284,6 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
defer proxier.mu.Unlock() defer proxier.mu.Unlock()
proxier.haveReceivedServiceUpdate = true proxier.haveReceivedServiceUpdate = true
glog.V(4).Infof("Received service update notice: %+v", allServices)
activeServices := make(map[proxy.ServicePortName]bool) // use a map as a set activeServices := make(map[proxy.ServicePortName]bool) // use a map as a set
for i := range allServices { for i := range allServices {
@ -239,7 +291,7 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
// if ClusterIP is "None" or empty, skip proxying // if ClusterIP is "None" or empty, skip proxying
if !api.IsServiceIPSet(service) { if !api.IsServiceIPSet(service) {
glog.V(3).Infof("Skipping service %s due to portal IP = %q", types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, service.Spec.ClusterIP) glog.V(3).Infof("Skipping service %s due to clusterIP = %q", types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, service.Spec.ClusterIP)
continue continue
} }
@ -255,23 +307,24 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
} }
if exists { if exists {
//Something changed. //Something changed.
glog.V(4).Infof("Something changed for service %q: removing it", serviceName) glog.V(3).Infof("Something changed for service %q: removing it", serviceName)
delete(proxier.serviceMap, serviceName) delete(proxier.serviceMap, serviceName)
} }
serviceIP := net.ParseIP(service.Spec.ClusterIP) serviceIP := net.ParseIP(service.Spec.ClusterIP)
glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol) glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol)
info = newServiceInfo(serviceName) info = newServiceInfo(serviceName)
info.portal.ip = serviceIP info.clusterIP = serviceIP
info.portal.port = servicePort.Port info.port = servicePort.Port
info.portal.protocol = servicePort.Protocol info.protocol = servicePort.Protocol
info.nodePort = servicePort.NodePort
info.deprecatedPublicIPs = service.Spec.DeprecatedPublicIPs info.deprecatedPublicIPs = service.Spec.DeprecatedPublicIPs
// Deep-copy in case the service instance changes // Deep-copy in case the service instance changes
info.loadBalancerStatus = *api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer) info.loadBalancerStatus = *api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)
info.sessionAffinityType = service.Spec.SessionAffinity info.sessionAffinityType = service.Spec.SessionAffinity
proxier.serviceMap[serviceName] = info proxier.serviceMap[serviceName] = info
glog.V(4).Infof("info: %+v", info) glog.V(4).Infof("added serviceInfo(%s): %s", serviceName, spew.Sdump(info))
} }
} }
@ -295,7 +348,6 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
defer proxier.mu.Unlock() defer proxier.mu.Unlock()
proxier.haveReceivedEndpointsUpdate = true proxier.haveReceivedEndpointsUpdate = true
glog.V(4).Infof("Received endpoints update notice: %+v", allEndpoints)
registeredEndpoints := make(map[proxy.ServicePortName]bool) // use a map as a set registeredEndpoints := make(map[proxy.ServicePortName]bool) // use a map as a set
// Update endpoints for services. // Update endpoints for services.
@ -347,6 +399,10 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
proxier.serviceMap[service].endpoints = nil proxier.serviceMap[service].endpoints = nil
} }
} }
if err := proxier.syncProxyRules(); err != nil {
glog.Errorf("Failed to sync iptables rules: %v", err)
}
} }
// used in OnEndpointsUpdate // used in OnEndpointsUpdate
@ -382,22 +438,23 @@ func flattenValidEndpoints(endpoints []hostPortPair) []string {
return result return result
} }
// servicePortToServiceChain takes the ServicePortName for a // servicePortToServiceChain takes the ServicePortName for a service and
// service and returns the associated iptables chain // returns the associated iptables chain. This is computed by hashing (sha256)
// this is computed by hashing (sha256) then encoding to base64 and // then encoding to base32 and truncating with the prefix "KUBE-SVC-". We do
// truncating with the prefix "KUBE-SVC-" // this because Iptables Chain Names must be <= 28 chars long, and the longer
// We do this because Iptables Chain Names must be <= 28 chars long // they are the harder they are to read.
func servicePortToServiceChain(s proxy.ServicePortName) utiliptables.Chain { func servicePortToServiceChain(s proxy.ServicePortName, protocol string) utiliptables.Chain {
hash := sha256.Sum256([]byte(s.String())) hash := sha256.Sum256([]byte(s.String() + protocol))
encoded := base32.StdEncoding.EncodeToString(hash[:]) encoded := base32.StdEncoding.EncodeToString(hash[:])
return utiliptables.Chain("KUBE-SVC-" + encoded[:19]) return utiliptables.Chain("KUBE-SVC-" + encoded[:16])
} }
// this is the same as servicePortToServiceChain but with the endpoint included essentially // This is the same as servicePortToServiceChain but with the endpoint
func servicePortAndEndpointToServiceChain(s proxy.ServicePortName, endpoint string) utiliptables.Chain { // included.
hash := sha256.Sum256([]byte(s.String() + "_" + endpoint)) func servicePortAndEndpointToServiceChain(s proxy.ServicePortName, protocol string, endpoint string) utiliptables.Chain {
hash := sha256.Sum256([]byte(s.String() + protocol + endpoint))
encoded := base32.StdEncoding.EncodeToString(hash[:]) encoded := base32.StdEncoding.EncodeToString(hash[:])
return utiliptables.Chain("KUBE-SEP-" + encoded[:19]) return utiliptables.Chain("KUBE-SEP-" + encoded[:16])
} }
// This is where all of the iptables-save/restore calls happen. // This is where all of the iptables-save/restore calls happen.
@ -406,27 +463,47 @@ func servicePortAndEndpointToServiceChain(s proxy.ServicePortName, endpoint stri
func (proxier *Proxier) syncProxyRules() error { func (proxier *Proxier) syncProxyRules() error {
// don't sync rules till we've received services and endpoints // don't sync rules till we've received services and endpoints
if !proxier.haveReceivedEndpointsUpdate || !proxier.haveReceivedServiceUpdate { if !proxier.haveReceivedEndpointsUpdate || !proxier.haveReceivedServiceUpdate {
glog.V(2).Info("not syncing iptables until Services and Endpoints have been received from master") glog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master")
return nil return nil
} }
glog.V(4).Infof("Syncing iptables rules.") glog.V(3).Infof("Syncing iptables rules")
// ensure main chain and rule connecting to output // Ensure main chains and rules are installed.
args := []string{"-j", string(iptablesServicesChain)} inputChains := []utiliptables.Chain{utiliptables.ChainOutput, utiliptables.ChainPrerouting}
if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, iptablesServicesChain); err != nil { // Link the services chain.
return err for _, chain := range inputChains {
if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, iptablesServicesChain); err != nil {
return err
}
comment := "kubernetes service portals; must be before nodeports"
args := []string{"-m", "comment", "--comment", comment, "-j", string(iptablesServicesChain)}
if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, utiliptables.TableNAT, chain, args...); err != nil {
return err
}
} }
if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, utiliptables.TableNAT, utiliptables.ChainOutput, args...); err != nil { // Link the nodeports chain.
return err for _, chain := range inputChains {
if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, iptablesNodePortsChain); err != nil {
return err
}
comment := "kubernetes service nodeports; must be after portals"
args := []string{"-m", "comment", "--comment", comment, "-m", "addrtype", "--dst-type", "LOCAL", "-j", string(iptablesNodePortsChain)}
if _, err := proxier.iptables.EnsureRule(utiliptables.Append, utiliptables.TableNAT, chain, args...); err != nil {
return err
}
} }
if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, utiliptables.TableNAT, utiliptables.ChainPrerouting, args...); err != nil { // Link the output rules.
return err {
comment := "kubernetes service traffic requiring SNAT"
args := []string{"-m", "comment", "--comment", comment, "-m", "mark", "--mark", iptablesMasqueradeMark, "-j", "MASQUERADE"}
if _, err := proxier.iptables.EnsureRule(utiliptables.Append, utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil {
return err
}
} }
// Get iptables-save output so we can check for existing chains and rules. // Get iptables-save output so we can check for existing chains and rules.
// This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore // This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore
existingChains := make(map[utiliptables.Chain]string) existingChains := make(map[utiliptables.Chain]string)
// run iptables-save
iptablesSaveRaw, err := proxier.iptables.Save(utiliptables.TableNAT) iptablesSaveRaw, err := proxier.iptables.Save(utiliptables.TableNAT)
if err != nil { // if we failed to get any rules if err != nil { // if we failed to get any rules
glog.Errorf("Failed to execute iptable-save, syncing all rules. %s", err.Error()) glog.Errorf("Failed to execute iptable-save, syncing all rules. %s", err.Error())
@ -434,126 +511,213 @@ func (proxier *Proxier) syncProxyRules() error {
existingChains = getChainLines(utiliptables.TableNAT, iptablesSaveRaw) existingChains = getChainLines(utiliptables.TableNAT, iptablesSaveRaw)
} }
// for first line and chains chainsLines := bytes.NewBuffer(nil)
var chainsLines bytes.Buffer rulesLines := bytes.NewBuffer(nil)
// for the actual rules (which should be after the list of chains)
var rulesLines bytes.Buffer
// write table header // Write table header.
chainsLines.WriteString("*nat\n") writeLine(chainsLines, "*nat")
// Make sure we keep stats for the top-level chains, if they existed
// (which they should have because we created them above).
if chain, ok := existingChains[iptablesServicesChain]; ok { if chain, ok := existingChains[iptablesServicesChain]; ok {
chainsLines.WriteString(fmt.Sprintf("%s\n", chain)) writeLine(chainsLines, chain)
} else { } else {
chainsLines.WriteString(makeChainLine(iptablesServicesChain)) writeLine(chainsLines, makeChainLine(iptablesServicesChain))
}
if chain, ok := existingChains[iptablesNodePortsChain]; ok {
writeLine(chainsLines, chain)
} else {
writeLine(chainsLines, makeChainLine(iptablesNodePortsChain))
} }
newHostChains := []utiliptables.Chain{} // Accumulate chains to keep.
newServiceChains := []utiliptables.Chain{} activeChains := make(map[utiliptables.Chain]bool) // use a map as a set
//Build rules for services // Build rules for each service.
for name, info := range proxier.serviceMap { for name, info := range proxier.serviceMap {
protocol := strings.ToLower((string)(info.portal.protocol)) protocol := strings.ToLower(string(info.protocol))
// get chain name
svcChain := servicePortToServiceChain(name) // Create the per-service chain, retaining counters if possible.
// Create chain svcChain := servicePortToServiceChain(name, protocol)
if chain, ok := existingChains[svcChain]; ok { if chain, ok := existingChains[svcChain]; ok {
chainsLines.WriteString(fmt.Sprintf("%s\n", chain)) writeLine(chainsLines, chain)
} else { } else {
chainsLines.WriteString(makeChainLine(svcChain)) writeLine(chainsLines, makeChainLine(svcChain))
}
// get hosts and host-Chains
hosts := make([]string, 0)
hostChains := make([]utiliptables.Chain, 0)
for _, ep := range info.endpoints {
hosts = append(hosts, ep)
hostChains = append(hostChains, servicePortAndEndpointToServiceChain(name, ep))
} }
activeChains[svcChain] = true
// Ensure we know what chains to flush/remove next time we generate the rules // Capture the clusterIP.
newHostChains = append(newHostChains, hostChains...) writeLine(rulesLines,
newServiceChains = append(newServiceChains, svcChain) "-A", string(iptablesServicesChain),
"-m", "comment", "--comment", fmt.Sprintf("\"%s cluster IP\"", name.String()),
"-m", protocol, "-p", protocol,
"-d", fmt.Sprintf("%s/32", info.clusterIP.String()),
"--dport", fmt.Sprintf("%d", info.port),
"-j", string(svcChain))
// write chain and sticky session rule // Capture externalIPs.
for _, hostChain := range hostChains { for _, externalIP := range info.deprecatedPublicIPs {
// Create chain args := []string{
if chain, ok := existingChains[utiliptables.Chain(hostChain)]; ok { "-A", string(iptablesServicesChain),
chainsLines.WriteString(fmt.Sprintf("%s\n", chain)) "-m", "comment", "--comment", fmt.Sprintf("\"%s external IP\"", name.String()),
} else { "-m", protocol, "-p", protocol,
chainsLines.WriteString(makeChainLine(hostChain)) "-d", fmt.Sprintf("%s/32", externalIP),
} "--dport", fmt.Sprintf("%d", info.port),
// Sticky session
if info.sessionAffinityType == api.ServiceAffinityClientIP {
rulesLines.WriteString(fmt.Sprintf("-A %s -m comment --comment \"%s\" -m recent --name %s --rcheck --seconds %d --reap -j %s\n", svcChain, name.String(), hostChain, info.stickyMaxAgeMinutes*60, hostChain))
} }
// We have to SNAT packets from external IPs.
writeLine(rulesLines, append(args,
"-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))...)
writeLine(rulesLines, append(args,
"-j", string(svcChain))...)
} }
// write proxy/loadblanacing rules // Capture load-balancer ingress.
n := len(hostChains)
for i, hostChain := range hostChains {
// Roughly round robin statistically if we have more than one host
if i < (n - 1) {
rulesLines.WriteString(fmt.Sprintf("-A %s -m comment --comment \"%s\" -m statistic --mode random --probability %f -j %s\n", svcChain, name.String(), 1.0/float64(n-i), hostChain))
} else {
rulesLines.WriteString(fmt.Sprintf("-A %s -m comment --comment \"%s\" -j %s\n", svcChain, name.String(), hostChain))
}
// proxy
if info.sessionAffinityType == api.ServiceAffinityClientIP {
rulesLines.WriteString(fmt.Sprintf("-A %s -m comment --comment \"%s\" -m recent --name %s --set -j DNAT -p %s --to-destination %s\n", hostChain, name.String(), hostChain, protocol, hosts[i]))
} else {
rulesLines.WriteString(fmt.Sprintf("-A %s -m comment --comment \"%s\" -j DNAT -p %s --to-destination %s\n", hostChain, name.String(), protocol, hosts[i]))
}
}
// proxy
rulesLines.WriteString(fmt.Sprintf("-A %s -m comment --comment \"portal for %s\" -d %s/32 -m state --state NEW -p %s -m %s --dport %d -j %s\n", iptablesServicesChain, name.String(), info.portal.ip.String(), protocol, protocol, info.portal.port, svcChain))
for _, publicIP := range info.deprecatedPublicIPs {
rulesLines.WriteString(fmt.Sprintf("-A %s -m comment --comment \"deprecated-PublicIP portal for %s\" -d %s/32 -m state --state NEW -p %s -m %s --dport %d -j %s\n", iptablesServicesChain, name.String(), publicIP, protocol, protocol, info.portal.port, svcChain))
}
for _, ingress := range info.loadBalancerStatus.Ingress { for _, ingress := range info.loadBalancerStatus.Ingress {
if ingress.IP != "" { if ingress.IP != "" {
rulesLines.WriteString(fmt.Sprintf("-A %s -m comment --comment \"load-balancer portal for %s\" -d %s/32 -m state --state NEW -p %s -m %s --dport %d -j %s\n", iptablesServicesChain, name.String(), ingress.IP, protocol, protocol, info.portal.port, svcChain)) args := []string{
"-A", string(iptablesServicesChain),
"-m", "comment", "--comment", fmt.Sprintf("\"%s loadbalancer IP\"", name.String()),
"-m", protocol, "-p", protocol,
"-d", fmt.Sprintf("%s/32", ingress.IP),
"--dport", fmt.Sprintf("%d", info.port),
}
// We have to SNAT packets from external IPs.
writeLine(rulesLines, append(args,
"-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))...)
writeLine(rulesLines, append(args,
"-j", string(svcChain))...)
} }
} }
// Capture nodeports. If we had more than 2 rules it might be
// worthwhile to make a new per-service chain for nodeport rules, but
// with just 2 rules it ends up being a waste and a cognitive burden.
if info.nodePort != 0 {
// Nodeports need SNAT.
writeLine(rulesLines,
"-A", string(iptablesNodePortsChain),
"-m", "comment", "--comment", name.String(),
"-m", protocol, "-p", protocol,
"--dport", fmt.Sprintf("%d", info.nodePort),
"-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))
// Jump to the service chain.
writeLine(rulesLines,
"-A", string(iptablesNodePortsChain),
"-m", "comment", "--comment", name.String(),
"-m", protocol, "-p", protocol,
"--dport", fmt.Sprintf("%d", info.nodePort),
"-j", string(svcChain))
}
// Generate the per-endpoint chains. We do this in multiple passes so we
// can group rules together.
endpoints := make([]string, 0)
endpointChains := make([]utiliptables.Chain, 0)
for _, ep := range info.endpoints {
endpoints = append(endpoints, ep)
endpointChain := servicePortAndEndpointToServiceChain(name, protocol, ep)
endpointChains = append(endpointChains, endpointChain)
// Create the endpoint chain, retaining counters if possible.
if chain, ok := existingChains[utiliptables.Chain(endpointChain)]; ok {
writeLine(chainsLines, chain)
} else {
writeLine(chainsLines, makeChainLine(endpointChain))
}
activeChains[endpointChain] = true
}
// First write session affinity rules, if applicable.
if info.sessionAffinityType == api.ServiceAffinityClientIP {
for _, endpointChain := range endpointChains {
writeLine(rulesLines,
"-A", string(svcChain),
"-m", "comment", "--comment", name.String(),
"-m", "recent", "--name", string(endpointChain),
"--rcheck", "--seconds", fmt.Sprintf("%d", info.stickyMaxAgeSeconds), "--reap",
"-j", string(endpointChain))
}
}
// Now write loadbalancing & DNAT rules.
n := len(endpointChains)
for i, endpointChain := range endpointChains {
// Balancing rules in the per-service chain.
args := []string{
"-A", string(svcChain),
"-m", "comment", "--comment", name.String(),
}
if i < (n - 1) {
// Each rule is a probabilistic match.
args = append(args,
"-m", "statistic",
"--mode", "random",
"--probability", fmt.Sprintf("%0.5f", 1.0/float64(n-i)))
}
// The final (or only if n == 1) rule is a guaranteed match.
args = append(args, "-j", string(endpointChain))
writeLine(rulesLines, args...)
// Rules in the per-endpoint chain.
args = []string{
"-A", string(endpointChain),
"-m", "comment", "--comment", name.String(),
}
// Handle traffic that loops back to the originator with SNAT.
// Technically we only need to do this if the endpoint is on this
// host, but we don't have that information, so we just do this for
// all endpoints.
// TODO: if we grow logic to get this node's pod CIDR, we can use it.
writeLine(rulesLines, append(args,
"-s", fmt.Sprintf("%s/32", strings.Split(endpoints[i], ":")[0]),
"-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))...)
// Update client-affinity lists.
if info.sessionAffinityType == api.ServiceAffinityClientIP {
args = append(args, "-m", "recent", "--name", string(endpointChain), "--set")
}
// DNAT to final destination.
args = append(args,
"-m", protocol, "-p", protocol,
"-j", "DNAT", "--to-destination", endpoints[i])
writeLine(rulesLines, args...)
}
} }
// Delete chains no longer in use: // Delete chains no longer in use.
activeChains := make(map[utiliptables.Chain]bool) // use a map as a set
for _, chain := range newHostChains {
activeChains[chain] = true
}
for _, chain := range newServiceChains {
activeChains[chain] = true
}
for chain := range existingChains { for chain := range existingChains {
if !activeChains[chain] { if !activeChains[chain] {
chainString := string(chain) chainString := string(chain)
// Ignore chains that aren't ours.
if !strings.HasPrefix(chainString, "KUBE-SVC-") && !strings.HasPrefix(chainString, "KUBE-SEP-") { if !strings.HasPrefix(chainString, "KUBE-SVC-") && !strings.HasPrefix(chainString, "KUBE-SEP-") {
// Ignore chains that aren't ours.
continue continue
} }
rulesLines.WriteString(fmt.Sprintf("-F %s\n-X %s\n", chain, chain)) // We must (as per iptables) write a chain-line for it, which has
// the nice effect of flushing the chain. Then we can remove the
// chain.
writeLine(chainsLines, existingChains[chain])
writeLine(rulesLines, "-X", chainString)
} }
} }
// write end of table // Write the end-of-table marker.
rulesLines.WriteString("COMMIT\n") writeLine(rulesLines, "COMMIT")
// combine parts
lines := append(chainsLines.Bytes(), rulesLines.Bytes()...)
// sync rules and return error // Sync rules.
// NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table.
lines := append(chainsLines.Bytes(), rulesLines.Bytes()...)
glog.V(3).Infof("Syncing rules: %s", lines) glog.V(3).Infof("Syncing rules: %s", lines)
// NOTE: flush=false is used so we don't flush non-kubernetes chains in the table. return proxier.iptables.Restore(utiliptables.TableNAT, lines, utiliptables.NoFlushTables, utiliptables.RestoreCounters)
err = proxier.iptables.Restore(utiliptables.TableNAT, lines, utiliptables.NoFlushTables, utiliptables.RestoreCounters) }
return err
// Join all words with spaces, terminate with newline and write to buf.
func writeLine(buf *bytes.Buffer, words ...string) {
buf.WriteString(strings.Join(words, " ") + "\n")
} }
// return an iptables-save/restore formatted chain line given a Chain // return an iptables-save/restore formatted chain line given a Chain
func makeChainLine(chain utiliptables.Chain) string { func makeChainLine(chain utiliptables.Chain) string {
return fmt.Sprintf(":%s - [0:0]\n", chain) return fmt.Sprintf(":%s - [0:0]", chain)
} }
// getChainLines parses a table's iptables-save data to find chains in the table. // getChainLines parses a table's iptables-save data to find chains in the table.

View File

@ -21,10 +21,10 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"regexp" "regexp"
"strconv"
"strings" "strings"
"sync" "sync"
"github.com/coreos/go-semver/semver"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
utilexec "k8s.io/kubernetes/pkg/util/exec" utilexec "k8s.io/kubernetes/pkg/util/exec"
@ -106,6 +106,10 @@ type FlushFlag bool
const FlushTables FlushFlag = true const FlushTables FlushFlag = true
const NoFlushTables FlushFlag = false const NoFlushTables FlushFlag = false
// Versions of iptables less than this do not support the -C / --check flag
// (test whether a rule exists).
const MinCheckVersion = "1.4.11"
// runner implements Interface in terms of exec("iptables"). // runner implements Interface in terms of exec("iptables").
type runner struct { type runner struct {
mu sync.Mutex mu sync.Mutex
@ -399,44 +403,24 @@ func makeFullArgs(table Table, chain Chain, args ...string) []string {
// Checks if iptables has the "-C" flag // Checks if iptables has the "-C" flag
func getIptablesHasCheckCommand(exec utilexec.Interface) (bool, error) { func getIptablesHasCheckCommand(exec utilexec.Interface) (bool, error) {
minVersion, err := semver.NewVersion(MinCheckVersion)
if err != nil {
return false, err
}
// Returns "vX.Y.Z".
vstring, err := GetIptablesVersionString(exec) vstring, err := GetIptablesVersionString(exec)
if err != nil { if err != nil {
return false, err return false, err
} }
// Make a semver of the part after the v in "vX.X.X".
v1, v2, v3, err := extractIptablesVersion(vstring) version, err := semver.NewVersion(vstring[1:])
if err != nil { if err != nil {
return false, err return false, err
} }
if version.LessThan(*minVersion) {
return iptablesHasCheckCommand(v1, v2, v3), nil return false, nil
}
// extractIptablesVersion returns the first three components of the iptables version.
// e.g. "iptables v1.3.66" would return (1, 3, 66, nil)
func extractIptablesVersion(str string) (int, int, int, error) {
versionMatcher := regexp.MustCompile("v([0-9]+)\\.([0-9]+)\\.([0-9]+)")
result := versionMatcher.FindStringSubmatch(str)
if result == nil {
return 0, 0, 0, fmt.Errorf("no iptables version found in string: %s", str)
} }
return true, nil
v1, err := strconv.Atoi(result[1])
if err != nil {
return 0, 0, 0, err
}
v2, err := strconv.Atoi(result[2])
if err != nil {
return 0, 0, 0, err
}
v3, err := strconv.Atoi(result[3])
if err != nil {
return 0, 0, 0, err
}
return v1, v2, v3, nil
} }
// GetIptablesVersionString runs "iptables --version" to get the version string, // GetIptablesVersionString runs "iptables --version" to get the version string,
@ -455,17 +439,3 @@ func GetIptablesVersionString(exec utilexec.Interface) (string, error) {
} }
return match[0], nil return match[0], nil
} }
// Checks if an iptables version is after 1.4.11, when --check was added
func iptablesHasCheckCommand(v1 int, v2 int, v3 int) bool {
if v1 > 1 {
return true
}
if v1 == 1 && v2 > 4 {
return true
}
if v1 == 1 && v2 == 4 && v3 >= 11 {
return true
}
return false
}

View File

@ -438,65 +438,6 @@ func TestGetIptablesHasCheckCommand(t *testing.T) {
} }
} }
func TestExtractIptablesVersion(t *testing.T) {
testCases := []struct {
Version string
V1 int
V2 int
V3 int
Err bool
}{
{"iptables v1.4.7", 1, 4, 7, false},
{"iptables v1.4.11", 1, 4, 11, false},
{"iptables v0.2.5", 0, 2, 5, false},
{"iptables v1.2.3.4.5.6", 1, 2, 3, false},
{"iptables v1.4", 0, 0, 0, true},
{"iptables v12345.12345.12345.12344", 12345, 12345, 12345, false},
{"total junk", 0, 0, 0, true},
}
for _, testCase := range testCases {
v1, v2, v3, err := extractIptablesVersion(testCase.Version)
if (err != nil) != testCase.Err {
t.Errorf("Expected error: %v, Got error: %v", testCase.Err, err)
}
if err == nil {
if v1 != testCase.V1 {
t.Errorf("First version number incorrect for string \"%s\", got %d, expected %d", testCase.Version, v1, testCase.V1)
}
if v2 != testCase.V2 {
t.Errorf("Second version number incorrect for string \"%s\", got %d, expected %d", testCase.Version, v2, testCase.V2)
}
if v3 != testCase.V3 {
t.Errorf("Third version number incorrect for string \"%s\", got %d, expected %d", testCase.Version, v3, testCase.V3)
}
}
}
}
func TestIptablesHasCheckCommand(t *testing.T) {
testCases := []struct {
V1 int
V2 int
V3 int
Result bool
}{
{0, 55, 55, false},
{1, 0, 55, false},
{1, 4, 10, false},
{1, 4, 11, true},
{1, 4, 19, true},
{1, 5, 0, true},
{2, 0, 0, true},
}
for _, testCase := range testCases {
if result := iptablesHasCheckCommand(testCase.V1, testCase.V2, testCase.V3); result != testCase.Result {
t.Errorf("For %d.%d.%d expected %v got %v", testCase.V1, testCase.V2, testCase.V3, testCase.Result, result)
}
}
}
func TestCheckRuleWithoutCheckPresent(t *testing.T) { func TestCheckRuleWithoutCheckPresent(t *testing.T) {
iptables_save_output := `# Generated by iptables-save v1.4.7 on Wed Oct 29 14:56:01 2014 iptables_save_output := `# Generated by iptables-save v1.4.7 on Wed Oct 29 14:56:01 2014
*nat *nat