Add minimum iptables sync period to the proxy, default is 2/sec

This commit is contained in:
Timothy St. Clair 2016-10-21 04:40:46 -05:00
parent a96f028208
commit 2b012e822a
13 changed files with 70 additions and 29 deletions

View File

@ -77,7 +77,8 @@ func (s *ProxyServerConfig) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.HostnameOverride, "hostname-override", s.HostnameOverride, "If non-empty, will use this string as identification instead of the actual hostname.")
fs.Var(&s.Mode, "proxy-mode", "Which proxy mode to use: 'userspace' (older) or 'iptables' (faster). If blank, look at the Node object on the Kubernetes API and respect the '"+ExperimentalProxyModeAnnotation+"' annotation if provided. Otherwise use the best-available proxy (currently iptables). If the iptables proxy is selected, regardless of how, but the system's kernel or iptables versions are insufficient, this always falls back to the userspace proxy.")
fs.Int32Var(s.IPTablesMasqueradeBit, "iptables-masquerade-bit", util.Int32PtrDerefOr(s.IPTablesMasqueradeBit, 14), "If using the pure iptables proxy, the bit of the fwmark space to mark packets requiring SNAT with. Must be within the range [0, 31].")
fs.DurationVar(&s.IPTablesSyncPeriod.Duration, "iptables-sync-period", s.IPTablesSyncPeriod.Duration, "How often iptables rules are refreshed (e.g. '5s', '1m', '2h22m'). Must be greater than 0.")
fs.DurationVar(&s.IPTablesSyncPeriod.Duration, "iptables-sync-period", s.IPTablesSyncPeriod.Duration, "The maximum interval of how often iptables rules are refreshed (e.g. '5s', '1m', '2h22m'). Must be greater than 0.")
fs.DurationVar(&s.IPTablesMinSyncPeriod.Duration, "iptables-min-sync-period", s.IPTablesMinSyncPeriod.Duration, "The minimum interval of how often the iptables rules can be refreshed as endpoints and services change (e.g. '5s', '1m', '2h22m'). Must be greater than 0.")
fs.DurationVar(&s.ConfigSyncPeriod, "config-sync-period", s.ConfigSyncPeriod, "How often configuration from the apiserver is refreshed. Must be greater than 0.")
fs.BoolVar(&s.MasqueradeAll, "masquerade-all", s.MasqueradeAll, "If using the pure iptables proxy, SNAT everything")
fs.StringVar(&s.ClusterCIDR, "cluster-cidr", s.ClusterCIDR, "The CIDR range of pods in the cluster. It is used to bridge traffic coming from outside of the cluster. If not provided, no off-cluster bridging will be performed.")

View File

@ -206,7 +206,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
// IPTablesMasqueradeBit must be specified or defaulted.
return nil, fmt.Errorf("Unable to read IPTablesMasqueradeBit from config")
}
proxierIPTables, err := iptables.NewProxier(iptInterface, utilsysctl.New(), execer, config.IPTablesSyncPeriod.Duration, config.MasqueradeAll, int(*config.IPTablesMasqueradeBit), config.ClusterCIDR, hostname, getNodeIP(client, hostname))
proxierIPTables, err := iptables.NewProxier(iptInterface, utilsysctl.New(), execer, config.IPTablesSyncPeriod.Duration, config.IPTablesMinSyncPeriod.Duration, config.MasqueradeAll, int(*config.IPTablesMasqueradeBit), config.ClusterCIDR, hostname, getNodeIP(client, hostname))
if err != nil {
glog.Fatalf("Unable to create proxier: %v", err)
}
@ -229,6 +229,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
iptInterface,
*utilnet.ParsePortRangeOrDie(config.PortRange),
config.IPTablesSyncPeriod.Duration,
config.IPTablesMinSyncPeriod.Duration,
config.UDPIdleTimeout.Duration,
)
if err != nil {

View File

@ -272,6 +272,7 @@ instance-metadata
instance-name-prefix
iptables-drop-bit
iptables-masquerade-bit
iptables-min-sync-period
iptables-sync-period
ir-data-source
ir-dbname

View File

@ -44,6 +44,9 @@ type KubeProxyConfiguration struct {
// iptablesSyncPeriod is the period that iptables rules are refreshed (e.g. '5s', '1m',
// '2h22m'). Must be greater than 0.
IPTablesSyncPeriod unversioned.Duration `json:"iptablesSyncPeriodSeconds"`
// iptablesMinSyncPeriod is the minimum period that iptables rules are refreshed (e.g. '5s', '1m',
// '2h22m'). Must be greater than 0.
IPTablesMinSyncPeriod unversioned.Duration `json:"iptablesMinSyncPeriodSeconds"`
// kubeconfigPath is the path to the kubeconfig file with authorization information (the
// master location is set by the master flag).
KubeconfigPath string `json:"kubeconfigPath"`

View File

@ -80,6 +80,9 @@ func SetDefaults_KubeProxyConfiguration(obj *KubeProxyConfiguration) {
if obj.IPTablesSyncPeriod.Duration == 0 {
obj.IPTablesSyncPeriod = unversioned.Duration{Duration: 30 * time.Second}
}
if obj.IPTablesMinSyncPeriod.Duration == 0 {
obj.IPTablesMinSyncPeriod = unversioned.Duration{Duration: 2 * time.Second}
}
zero := unversioned.Duration{}
if obj.UDPIdleTimeout == zero {
obj.UDPIdleTimeout = unversioned.Duration{Duration: 250 * time.Millisecond}

View File

@ -41,6 +41,9 @@ type KubeProxyConfiguration struct {
// iptablesSyncPeriod is the period that iptables rules are refreshed (e.g. '5s', '1m',
// '2h22m'). Must be greater than 0.
IPTablesSyncPeriod unversioned.Duration `json:"iptablesSyncPeriodSeconds"`
// iptablesMinSyncPeriod is the minimum period that iptables rules are refreshed (e.g. '5s', '1m',
// '2h22m'). Must be greater than 0.
IPTablesMinSyncPeriod unversioned.Duration `json:"iptablesMinSyncPeriodSeconds"`
// kubeconfigPath is the path to the kubeconfig file with authorization information (the
// master location is set by the master flag).
KubeconfigPath string `json:"kubeconfigPath"`

View File

@ -22,6 +22,7 @@ go_library(
"//pkg/types:go_default_library",
"//pkg/util/config:go_default_library",
"//pkg/util/exec:go_default_library",
"//pkg/util/flowcontrol:go_default_library",
"//pkg/util/iptables:go_default_library",
"//pkg/util/sets:go_default_library",
"//pkg/util/slice:go_default_library",

View File

@ -42,6 +42,7 @@ import (
"k8s.io/kubernetes/pkg/types"
featuregate "k8s.io/kubernetes/pkg/util/config"
utilexec "k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/flowcontrol"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/slice"
@ -167,9 +168,11 @@ type Proxier struct {
portsMap map[localPort]closeable
haveReceivedServiceUpdate bool // true once we've seen an OnServiceUpdate event
haveReceivedEndpointsUpdate bool // true once we've seen an OnEndpointsUpdate event
throttle flowcontrol.RateLimiter
// These are effectively const and do not need the mutex to be held.
syncPeriod time.Duration
minSyncPeriod time.Duration
iptables utiliptables.Interface
masqueradeAll bool
masqueradeMark string
@ -217,7 +220,12 @@ var _ proxy.ProxyProvider = &Proxier{}
// An error will be returned if iptables fails to update or acquire the initial lock.
// Once a proxier is created, it will keep iptables up to date in the background and
// will not terminate if a particular iptables call fails.
func NewProxier(ipt utiliptables.Interface, sysctl utilsysctl.Interface, exec utilexec.Interface, syncPeriod time.Duration, masqueradeAll bool, masqueradeBit int, clusterCIDR string, hostname string, nodeIP net.IP) (*Proxier, error) {
func NewProxier(ipt utiliptables.Interface, sysctl utilsysctl.Interface, exec utilexec.Interface, syncPeriod time.Duration, minSyncPeriod time.Duration, masqueradeAll bool, masqueradeBit int, clusterCIDR string, hostname string, nodeIP net.IP) (*Proxier, error) {
// check valid user input
if minSyncPeriod == 0 || minSyncPeriod > syncPeriod {
return nil, fmt.Errorf("min-sync (%v) must be < sync(%v) and > 0 ", minSyncPeriod, syncPeriod)
}
// Set the route_localnet sysctl we need for
if err := sysctl.SetSysctl(sysctlRouteLocalnet, 1); err != nil {
return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlRouteLocalnet, err)
@ -244,11 +252,16 @@ func NewProxier(ipt utiliptables.Interface, sysctl utilsysctl.Interface, exec ut
go healthcheck.Run()
syncsPerSecond := float32(time.Second) / float32(minSyncPeriod)
return &Proxier{
serviceMap: make(map[proxy.ServicePortName]*serviceInfo),
endpointsMap: make(map[proxy.ServicePortName][]*endpointsInfo),
portsMap: make(map[localPort]closeable),
syncPeriod: syncPeriod,
serviceMap: make(map[proxy.ServicePortName]*serviceInfo),
endpointsMap: make(map[proxy.ServicePortName][]*endpointsInfo),
portsMap: make(map[localPort]closeable),
syncPeriod: syncPeriod,
minSyncPeriod: minSyncPeriod,
// The average use case will process 2 updates in short succession
throttle: flowcontrol.NewTokenBucketRateLimiter(syncsPerSecond, 2),
iptables: ipt,
masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark,
@ -765,6 +778,9 @@ func (proxier *Proxier) execConntrackTool(parameters ...string) error {
// The only other iptables rules are those that are setup in iptablesInit()
// assumes proxier.mu is held
func (proxier *Proxier) syncProxyRules() {
if proxier.throttle != nil {
proxier.throttle.Accept()
}
start := time.Now()
defer func() {
glog.V(4).Infof("syncProxyRules took %v", time.Since(start))

View File

@ -87,6 +87,7 @@ type Proxier struct {
mu sync.Mutex // protects serviceMap
serviceMap map[proxy.ServicePortName]*serviceInfo
syncPeriod time.Duration
minSyncPeriod time.Duration // unused atm, but plumbed through
udpIdleTimeout time.Duration
portMapMutex sync.Mutex
portMap map[portMapKey]*portMapValue
@ -139,7 +140,7 @@ func IsProxyLocked(err error) bool {
// if iptables fails to update or acquire the initial lock. Once a proxier is
// created, it will keep iptables up to date in the background and will not
// terminate if a particular iptables call fails.
func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, pr utilnet.PortRange, syncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) {
func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, pr utilnet.PortRange, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) {
if listenIP.Equal(localhostIPv4) || listenIP.Equal(localhostIPv6) {
return nil, ErrProxyOnLocalhost
}
@ -157,10 +158,10 @@ func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.In
proxyPorts := newPortAllocator(pr)
glog.V(2).Infof("Setting proxy IP to %v and initializing iptables", hostIP)
return createProxier(loadBalancer, listenIP, iptables, hostIP, proxyPorts, syncPeriod, udpIdleTimeout)
return createProxier(loadBalancer, listenIP, iptables, hostIP, proxyPorts, syncPeriod, minSyncPeriod, udpIdleTimeout)
}
func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, hostIP net.IP, proxyPorts PortAllocator, syncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) {
func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, hostIP net.IP, proxyPorts PortAllocator, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) {
// convenient to pass nil for tests..
if proxyPorts == nil {
proxyPorts = newPortAllocator(utilnet.PortRange{})
@ -175,10 +176,12 @@ func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables
return nil, fmt.Errorf("failed to flush iptables: %v", err)
}
return &Proxier{
loadBalancer: loadBalancer,
serviceMap: make(map[proxy.ServicePortName]*serviceInfo),
portMap: make(map[portMapKey]*portMapValue),
syncPeriod: syncPeriod,
loadBalancer: loadBalancer,
serviceMap: make(map[proxy.ServicePortName]*serviceInfo),
portMap: make(map[portMapKey]*portMapValue),
syncPeriod: syncPeriod,
// plumbed through if needed, not used atm.
minSyncPeriod: minSyncPeriod,
udpIdleTimeout: udpIdleTimeout,
listenIP: listenIP,
iptables: iptables,

View File

@ -210,7 +210,7 @@ func TestTCPProxy(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
@ -237,7 +237,7 @@ func TestUDPProxy(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
@ -264,7 +264,7 @@ func TestUDPProxyTimeout(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
@ -300,7 +300,7 @@ func TestMultiPortProxy(t *testing.T) {
}},
}})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
@ -327,7 +327,7 @@ func TestMultiPortOnServiceUpdate(t *testing.T) {
serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "q"}
serviceX := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "x"}
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
@ -390,7 +390,7 @@ func TestTCPProxyStop(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
@ -434,7 +434,7 @@ func TestUDPProxyStop(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
@ -472,7 +472,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
@ -509,7 +509,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
@ -545,7 +545,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
}
lb.OnEndpointsUpdate([]api.Endpoints{endpoint})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
@ -598,7 +598,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
}
lb.OnEndpointsUpdate([]api.Endpoints{endpoint})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
@ -652,7 +652,7 @@ func TestTCPProxyUpdatePort(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
@ -700,7 +700,7 @@ func TestUDPProxyUpdatePort(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
@ -745,7 +745,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
@ -796,7 +796,7 @@ func TestProxyUpdatePortal(t *testing.T) {
}
lb.OnEndpointsUpdate([]api.Endpoints{endpoint})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}

View File

@ -44,6 +44,9 @@ type KubeProxyConfiguration struct {
// iptablesSyncPeriod is the period that iptables rules are refreshed (e.g. '5s', '1m',
// '2h22m'). Must be greater than 0.
IPTablesSyncPeriod unversioned.Duration `json:"iptablesSyncPeriodSeconds"`
// iptablesMinSyncPeriod is the minimum period that iptables rules are refreshed (e.g. '5s', '1m',
// '2h22m'). Must be greater than 0.
IPTablesMinSyncPeriod unversioned.Duration `json:"iptablesMinSyncPeriodSeconds"`
// kubeconfigPath is the path to the kubeconfig file with authorization information (the
// master location is set by the master flag).
KubeconfigPath string `json:"kubeconfigPath"`

View File

@ -80,6 +80,9 @@ func SetDefaults_KubeProxyConfiguration(obj *KubeProxyConfiguration) {
if obj.IPTablesSyncPeriod.Duration == 0 {
obj.IPTablesSyncPeriod = unversioned.Duration{Duration: 30 * time.Second}
}
if obj.IPTablesMinSyncPeriod.Duration == 0 {
obj.IPTablesMinSyncPeriod = unversioned.Duration{Duration: 2 * time.Second}
}
zero := unversioned.Duration{}
if obj.UDPIdleTimeout == zero {
obj.UDPIdleTimeout = unversioned.Duration{Duration: 250 * time.Millisecond}

View File

@ -41,6 +41,9 @@ type KubeProxyConfiguration struct {
// iptablesSyncPeriod is the period that iptables rules are refreshed (e.g. '5s', '1m',
// '2h22m'). Must be greater than 0.
IPTablesSyncPeriod unversioned.Duration `json:"iptablesSyncPeriodSeconds"`
// iptablesMinSyncPeriod is the minimum period that iptables rules are refreshed (e.g. '5s', '1m',
// '2h22m'). Must be greater than 0.
IPTablesMinSyncPeriod unversioned.Duration `json:"iptablesMinSyncPeriodSeconds"`
// kubeconfigPath is the path to the kubeconfig file with authorization information (the
// master location is set by the master flag).
KubeconfigPath string `json:"kubeconfigPath"`