update proxy server

This commit is contained in:
Mike Danese 2016-01-05 17:58:51 -08:00
parent 60c5db5e87
commit 309eac5f7f
6 changed files with 128 additions and 54 deletions

View File

@ -18,13 +18,14 @@ limitations under the License.
package options package options
import ( import (
"net"
_ "net/http/pprof" _ "net/http/pprof"
"time" "time"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/componentconfig"
"k8s.io/kubernetes/pkg/kubelet/qos" "k8s.io/kubernetes/pkg/kubelet/qos"
utilnet "k8s.io/kubernetes/pkg/util/net" "k8s.io/kubernetes/pkg/util"
"github.com/spf13/pflag" "github.com/spf13/pflag"
) )
@ -33,67 +34,59 @@ const (
ExperimentalProxyModeAnnotation = "net.experimental.kubernetes.io/proxy-mode" ExperimentalProxyModeAnnotation = "net.experimental.kubernetes.io/proxy-mode"
) )
// ProxyServerConfig contains configurations for a Kubernetes proxy server // ProxyServerConfig configures and runs a Kubernetes proxy server
type ProxyServerConfig struct { type ProxyServerConfig struct {
BindAddress net.IP componentconfig.KubeProxyConfiguration
HealthzPort int
HealthzBindAddress net.IP
OOMScoreAdj int
ResourceContainer string ResourceContainer string
Master string
Kubeconfig string
PortRange utilnet.PortRange
HostnameOverride string
ProxyMode string
IptablesSyncPeriod time.Duration
ConfigSyncPeriod time.Duration
NodeRef *api.ObjectReference // Reference to this node.
MasqueradeAll bool
CleanupAndExit bool
KubeAPIQPS float32 KubeAPIQPS float32
KubeAPIBurst int KubeAPIBurst int
UDPIdleTimeout time.Duration ConfigSyncPeriod time.Duration
ConntrackMax int CleanupAndExit bool
ConntrackTCPTimeoutEstablished int // seconds NodeRef *api.ObjectReference
Master string
Kubeconfig string
} }
func NewProxyConfig() *ProxyServerConfig { func NewProxyConfig() *ProxyServerConfig {
return &ProxyServerConfig{ return &ProxyServerConfig{
BindAddress: net.ParseIP("0.0.0.0"), KubeProxyConfiguration: componentconfig.KubeProxyConfiguration{
BindAddress: "0.0.0.0",
HealthzPort: 10249, HealthzPort: 10249,
HealthzBindAddress: net.ParseIP("127.0.0.1"), HealthzBindAddress: "127.0.0.1",
OOMScoreAdj: qos.KubeProxyOOMScoreAdj, OOMScoreAdj: util.IntPtr(qos.KubeProxyOOMScoreAdj),
ResourceContainer: "/kube-proxy", ResourceContainer: "/kube-proxy",
IptablesSyncPeriod: 30 * time.Second, IPTablesSyncPeriod: unversioned.Duration{30 * time.Second},
ConfigSyncPeriod: 15 * time.Minute, UDPIdleTimeout: unversioned.Duration{250 * time.Millisecond},
Mode: componentconfig.ProxyModeUserspace,
ConntrackMax: 256 * 1024, // 4x default (64k)
ConntrackTCPEstablishedTimeout: unversioned.Duration{Duration: 24 * time.Hour}, // 1 day (1/5 default)
},
KubeAPIQPS: 5.0, KubeAPIQPS: 5.0,
KubeAPIBurst: 10, KubeAPIBurst: 10,
UDPIdleTimeout: 250 * time.Millisecond, ConfigSyncPeriod: 15 * time.Minute,
ConntrackMax: 256 * 1024, // 4x default (64k)
ConntrackTCPTimeoutEstablished: 86400, // 1 day (1/5 default)
} }
} }
// AddFlags adds flags for a specific ProxyServer to the specified FlagSet // AddFlags adds flags for a specific ProxyServer to the specified FlagSet
func (s *ProxyServerConfig) AddFlags(fs *pflag.FlagSet) { func (s *ProxyServerConfig) AddFlags(fs *pflag.FlagSet) {
fs.IPVar(&s.BindAddress, "bind-address", s.BindAddress, "The IP address for the proxy server to serve on (set to 0.0.0.0 for all interfaces)") fs.Var(componentconfig.IPVar{&s.BindAddress}, "bind-address", "The IP address for the proxy server to serve on (set to 0.0.0.0 for all interfaces)")
fs.StringVar(&s.Master, "master", s.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)") fs.StringVar(&s.Master, "master", s.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
fs.IntVar(&s.HealthzPort, "healthz-port", s.HealthzPort, "The port to bind the health check server. Use 0 to disable.") fs.IntVar(&s.HealthzPort, "healthz-port", s.HealthzPort, "The port to bind the health check server. Use 0 to disable.")
fs.IPVar(&s.HealthzBindAddress, "healthz-bind-address", s.HealthzBindAddress, "The IP address for the health check server to serve on, defaulting to 127.0.0.1 (set to 0.0.0.0 for all interfaces)") fs.Var(componentconfig.IPVar{&s.HealthzBindAddress}, "healthz-bind-address", "The IP address for the health check server to serve on, defaulting to 127.0.0.1 (set to 0.0.0.0 for all interfaces)")
fs.IntVar(&s.OOMScoreAdj, "oom-score-adj", s.OOMScoreAdj, "The oom-score-adj value for kube-proxy process. Values must be within the range [-1000, 1000]") fs.IntVar(s.OOMScoreAdj, "oom-score-adj", util.IntPtrDerefOr(s.OOMScoreAdj, qos.KubeProxyOOMScoreAdj), "The oom-score-adj value for kube-proxy process. Values must be within the range [-1000, 1000]")
fs.StringVar(&s.ResourceContainer, "resource-container", s.ResourceContainer, "Absolute name of the resource-only container to create and run the Kube-proxy in (Default: /kube-proxy).") fs.StringVar(&s.ResourceContainer, "resource-container", s.ResourceContainer, "Absolute name of the resource-only container to create and run the Kube-proxy in (Default: /kube-proxy).")
fs.MarkDeprecated("resource-container", "This feature will be removed in a later release.") fs.MarkDeprecated("resource-container", "This feature will be removed in a later release.")
fs.StringVar(&s.Kubeconfig, "kubeconfig", s.Kubeconfig, "Path to kubeconfig file with authorization information (the master location is set by the master flag).") fs.StringVar(&s.Kubeconfig, "kubeconfig", s.Kubeconfig, "Path to kubeconfig file with authorization information (the master location is set by the master flag).")
fs.Var(&s.PortRange, "proxy-port-range", "Range of host ports (beginPort-endPort, inclusive) that may be consumed in order to proxy service traffic. If unspecified (0-0) then ports will be randomly chosen.") fs.Var(componentconfig.PortRangeVar{&s.PortRange}, "proxy-port-range", "Range of host ports (beginPort-endPort, inclusive) that may be consumed in order to proxy service traffic. If unspecified (0-0) then ports will be randomly chosen.")
fs.StringVar(&s.HostnameOverride, "hostname-override", s.HostnameOverride, "If non-empty, will use this string as identification instead of the actual hostname.") fs.StringVar(&s.HostnameOverride, "hostname-override", s.HostnameOverride, "If non-empty, will use this string as identification instead of the actual hostname.")
fs.StringVar(&s.ProxyMode, "proxy-mode", "", "Which proxy mode to use: 'userspace' (older) or 'iptables' (faster). If blank, look at the Node object on the Kubernetes API and respect the '"+ExperimentalProxyModeAnnotation+"' annotation if provided. Otherwise use the best-available proxy (currently iptables). If the iptables proxy is selected, regardless of how, but the system's kernel or iptables versions are insufficient, this always falls back to the userspace proxy.") fs.Var(&s.Mode, "proxy-mode", "Which proxy mode to use: 'userspace' (older) or 'iptables' (faster). If blank, look at the Node object on the Kubernetes API and respect the '"+ExperimentalProxyModeAnnotation+"' annotation if provided. Otherwise use the best-available proxy (currently iptables). If the iptables proxy is selected, regardless of how, but the system's kernel or iptables versions are insufficient, this always falls back to the userspace proxy.")
fs.DurationVar(&s.IptablesSyncPeriod, "iptables-sync-period", s.IptablesSyncPeriod, "How often iptables rules are refreshed (e.g. '5s', '1m', '2h22m'). Must be greater than 0.") fs.DurationVar(&s.IPTablesSyncPeriod.Duration, "iptables-sync-period", s.IPTablesSyncPeriod.Duration, "How often iptables rules are refreshed (e.g. '5s', '1m', '2h22m'). Must be greater than 0.")
fs.DurationVar(&s.ConfigSyncPeriod, "config-sync-period", s.ConfigSyncPeriod, "How often configuration from the apiserver is refreshed. Must be greater than 0.") fs.DurationVar(&s.ConfigSyncPeriod, "config-sync-period", s.ConfigSyncPeriod, "How often configuration from the apiserver is refreshed. Must be greater than 0.")
fs.BoolVar(&s.MasqueradeAll, "masquerade-all", false, "If using the pure iptables proxy, SNAT everything") fs.BoolVar(&s.MasqueradeAll, "masquerade-all", false, "If using the pure iptables proxy, SNAT everything")
fs.BoolVar(&s.CleanupAndExit, "cleanup-iptables", false, "If true cleanup iptables rules and exit.") fs.BoolVar(&s.CleanupAndExit, "cleanup-iptables", false, "If true cleanup iptables rules and exit.")
fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", s.KubeAPIQPS, "QPS to use while talking with kubernetes apiserver") fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", s.KubeAPIQPS, "QPS to use while talking with kubernetes apiserver")
fs.IntVar(&s.KubeAPIBurst, "kube-api-burst", s.KubeAPIBurst, "Burst to use while talking with kubernetes apiserver") fs.IntVar(&s.KubeAPIBurst, "kube-api-burst", s.KubeAPIBurst, "Burst to use while talking with kubernetes apiserver")
fs.DurationVar(&s.UDPIdleTimeout, "udp-timeout", s.UDPIdleTimeout, "How long an idle UDP connection will be kept open (e.g. '250ms', '2s'). Must be greater than 0. Only applicable for proxy-mode=userspace") fs.DurationVar(&s.UDPIdleTimeout.Duration, "udp-timeout", s.UDPIdleTimeout.Duration, "How long an idle UDP connection will be kept open (e.g. '250ms', '2s'). Must be greater than 0. Only applicable for proxy-mode=userspace")
fs.IntVar(&s.ConntrackMax, "conntrack-max", s.ConntrackMax, "Maximum number of NAT connections to track (0 to leave as-is)") fs.IntVar(&s.ConntrackMax, "conntrack-max", s.ConntrackMax, "Maximum number of NAT connections to track (0 to leave as-is)")
fs.IntVar(&s.ConntrackTCPTimeoutEstablished, "conntrack-tcp-timeout-established", s.ConntrackTCPTimeoutEstablished, "Idle timeout for established TCP connections (0 to leave as-is)") fs.DurationVar(&s.ConntrackTCPEstablishedTimeout.Duration, "conntrack-tcp-timeout-established", s.ConntrackTCPEstablishedTimeout.Duration, "Idle timeout for established TCP connections (0 to leave as-is)")
} }

View File

@ -20,6 +20,7 @@ package app
import ( import (
"errors" "errors"
"net"
"net/http" "net/http"
_ "net/http/pprof" _ "net/http/pprof"
"strconv" "strconv"
@ -40,6 +41,7 @@ import (
utildbus "k8s.io/kubernetes/pkg/util/dbus" utildbus "k8s.io/kubernetes/pkg/util/dbus"
"k8s.io/kubernetes/pkg/util/exec" "k8s.io/kubernetes/pkg/util/exec"
utiliptables "k8s.io/kubernetes/pkg/util/iptables" utiliptables "k8s.io/kubernetes/pkg/util/iptables"
utilnet "k8s.io/kubernetes/pkg/util/net"
nodeutil "k8s.io/kubernetes/pkg/util/node" nodeutil "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/util/oom" "k8s.io/kubernetes/pkg/util/oom"
@ -116,7 +118,7 @@ with the apiserver API to configure the proxy.`,
// NewProxyServerDefault creates a new ProxyServer object with default parameters. // NewProxyServerDefault creates a new ProxyServer object with default parameters.
func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, error) { func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, error) {
protocol := utiliptables.ProtocolIpv4 protocol := utiliptables.ProtocolIpv4
if config.BindAddress.To4() == nil { if net.ParseIP(config.BindAddress).To4() == nil {
protocol = utiliptables.ProtocolIpv6 protocol = utiliptables.ProtocolIpv6
} }
@ -135,9 +137,9 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
// TODO(vmarmol): Use container config for this. // TODO(vmarmol): Use container config for this.
var oomAdjuster *oom.OOMAdjuster var oomAdjuster *oom.OOMAdjuster
if config.OOMScoreAdj != 0 { if config.OOMScoreAdj != nil {
oomAdjuster = oom.NewOOMAdjuster() oomAdjuster = oom.NewOOMAdjuster()
if err := oomAdjuster.ApplyOOMScoreAdj(0, config.OOMScoreAdj); err != nil { if err := oomAdjuster.ApplyOOMScoreAdj(0, *config.OOMScoreAdj); err != nil {
glog.V(2).Info(err) glog.V(2).Info(err)
} }
} }
@ -182,10 +184,10 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
var proxier proxy.ProxyProvider var proxier proxy.ProxyProvider
var endpointsHandler proxyconfig.EndpointsConfigHandler var endpointsHandler proxyconfig.EndpointsConfigHandler
proxyMode := getProxyMode(config.ProxyMode, client.Nodes(), hostname, iptInterface) proxyMode := getProxyMode(string(config.Mode), client.Nodes(), hostname, iptInterface)
if proxyMode == proxyModeIptables { if proxyMode == proxyModeIptables {
glog.V(2).Info("Using iptables Proxier.") glog.V(2).Info("Using iptables Proxier.")
proxierIptables, err := iptables.NewProxier(iptInterface, execer, config.IptablesSyncPeriod, config.MasqueradeAll) proxierIptables, err := iptables.NewProxier(iptInterface, execer, config.IPTablesSyncPeriod.Duration, config.MasqueradeAll)
if err != nil { if err != nil {
glog.Fatalf("Unable to create proxier: %v", err) glog.Fatalf("Unable to create proxier: %v", err)
} }
@ -202,7 +204,14 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
// set EndpointsConfigHandler to our loadBalancer // set EndpointsConfigHandler to our loadBalancer
endpointsHandler = loadBalancer endpointsHandler = loadBalancer
proxierUserspace, err := userspace.NewProxier(loadBalancer, config.BindAddress, iptInterface, config.PortRange, config.IptablesSyncPeriod, config.UDPIdleTimeout) proxierUserspace, err := userspace.NewProxier(
loadBalancer,
net.ParseIP(config.BindAddress),
iptInterface,
*utilnet.ParsePortRangeOrDie(config.PortRange),
config.IPTablesSyncPeriod.Duration,
config.UDPIdleTimeout.Duration,
)
if err != nil { if err != nil {
glog.Fatalf("Unable to create proxier: %v", err) glog.Fatalf("Unable to create proxier: %v", err)
} }
@ -259,7 +268,7 @@ func (s *ProxyServer) Run() error {
// Start up Healthz service if requested // Start up Healthz service if requested
if s.Config.HealthzPort > 0 { if s.Config.HealthzPort > 0 {
go util.Until(func() { go util.Until(func() {
err := http.ListenAndServe(s.Config.HealthzBindAddress.String()+":"+strconv.Itoa(s.Config.HealthzPort), nil) err := http.ListenAndServe(s.Config.HealthzBindAddress+":"+strconv.Itoa(s.Config.HealthzPort), nil)
if err != nil { if err != nil {
glog.Errorf("Starting health server failed: %v", err) glog.Errorf("Starting health server failed: %v", err)
} }
@ -273,8 +282,8 @@ func (s *ProxyServer) Run() error {
return err return err
} }
} }
if s.Config.ConntrackTCPTimeoutEstablished > 0 { if s.Config.ConntrackTCPEstablishedTimeout.Duration > 0 {
if err := s.Conntracker.SetTCPEstablishedTimeout(s.Config.ConntrackTCPTimeoutEstablished); err != nil { if err := s.Conntracker.SetTCPEstablishedTimeout(int(s.Config.ConntrackTCPEstablishedTimeout.Duration / time.Second)); err != nil {
return err return err
} }
} }

View File

@ -19,9 +19,13 @@ package componentconfig
import ( import (
"fmt" "fmt"
"net" "net"
utilnet "k8s.io/kubernetes/pkg/util/net"
) )
// used for validating command line ip addresses. // used for validating command line opts
// TODO(mikedanese): remove these when we remove command line flags
type IPVar struct { type IPVar struct {
Val *string Val *string
} }
@ -48,3 +52,47 @@ func (v IPVar) String() string {
func (v IPVar) Type() string { func (v IPVar) Type() string {
return "ip" return "ip"
} }
func (m *ProxyMode) Set(s string) error {
nm := ProxyMode(s)
m = &nm
return nil
}
func (m *ProxyMode) String() string {
if m != nil {
return string(*m)
}
return ""
}
func (m *ProxyMode) Type() string {
return "ProxyMode"
}
type PortRangeVar struct {
Val *string
}
func (v PortRangeVar) Set(s string) error {
if _, err := utilnet.ParsePortRange(s); err != nil {
return fmt.Errorf("%q is not a valid port range: %v", s, err)
}
if v.Val == nil {
// it's okay to panic here since this is programmer error
panic("the string pointer passed into PortRangeVar should not be nil")
}
*v.Val = s
return nil
}
func (v PortRangeVar) String() string {
if v.Val == nil {
return ""
}
return *v.Val
}
func (v PortRangeVar) Type() string {
return "port-range"
}

View File

@ -26,6 +26,7 @@ import (
client "k8s.io/kubernetes/pkg/client/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned"
proxyconfig "k8s.io/kubernetes/pkg/proxy/config" proxyconfig "k8s.io/kubernetes/pkg/proxy/config"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
utiliptables "k8s.io/kubernetes/pkg/util/iptables" utiliptables "k8s.io/kubernetes/pkg/util/iptables"
"github.com/golang/glog" "github.com/golang/glog"
@ -59,7 +60,7 @@ func NewHollowProxyOrDie(
) *HollowProxy { ) *HollowProxy {
// Create and start Hollow Proxy // Create and start Hollow Proxy
config := options.NewProxyConfig() config := options.NewProxyConfig()
config.OOMScoreAdj = 0 config.OOMScoreAdj = util.IntPtr(0)
config.ResourceContainer = "" config.ResourceContainer = ""
config.NodeRef = &api.ObjectReference{ config.NodeRef = &api.ObjectReference{
Kind: "Node", Kind: "Node",

View File

@ -98,3 +98,11 @@ func ParsePortRange(value string) (*PortRange, error) {
} }
return pr, nil return pr, nil
} }
func ParsePortRangeOrDie(value string) *PortRange {
pr, err := ParsePortRange(value)
if err != nil {
panic(fmt.Sprintf("couldn't parse port range: %v"))
}
return pr
}

View File

@ -238,3 +238,18 @@ func ReadDirNoExit(dirname string) ([]os.FileInfo, []error, error) {
return list, errs, nil return list, errs, nil
} }
// IntPtr returns a pointer to an int
func IntPtr(i int) *int {
o := i
return &o
}
// IntPtrDerefOr derefrence the int ptr and returns it i not nil,
// else returns def.
func IntPtrDerefOr(ptr *int, def int) int {
if ptr != nil {
return *ptr
}
return def
}