From 1c25c2cd9954d2985d115e3dc876918222aff7db Mon Sep 17 00:00:00 2001 From: gmarek Date: Mon, 21 Sep 2015 12:05:22 +0200 Subject: [PATCH] Refactor KubeProxy to allow mocking of all moving parts. --- cmd/hyperkube/kube-proxy.go | 23 +++- cmd/kube-proxy/app/server.go | 213 +++++++++++++++++------------ cmd/kube-proxy/proxy.go | 12 +- contrib/mesos/cmd/km/kube-proxy.go | 24 +++- 4 files changed, 173 insertions(+), 99 deletions(-) diff --git a/cmd/hyperkube/kube-proxy.go b/cmd/hyperkube/kube-proxy.go index 427d776a67f..c28a30f607e 100644 --- a/cmd/hyperkube/kube-proxy.go +++ b/cmd/hyperkube/kube-proxy.go @@ -19,24 +19,35 @@ limitations under the License. package main import ( + "fmt" + "os" + kubeproxy "k8s.io/kubernetes/cmd/kube-proxy/app" ) // NewKubeProxy creates a new hyperkube Server object that includes the // description and flags. func NewKubeProxy() *Server { - s := kubeproxy.NewProxyServer() + config := kubeproxy.NewProxyConfig() hks := Server{ SimpleUsage: "proxy", Long: `The Kubernetes proxy server is responsible for taking traffic directed at - services and forwarding it to the appropriate pods. It generally runs on + services and forwarding it to the appropriate pods. It generally runs on nodes next to the Kubelet and proxies traffic from local pods to remote pods. It is also used when handling incoming external traffic.`, - Run: func(_ *Server, args []string) error { - return s.Run(args) - }, } - s.AddFlags(hks.Flags()) + + config.AddFlags(hks.Flags()) + s, err := kubeproxy.NewProxyServerDefault(config) + if err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + os.Exit(1) + } + + hks.Run = func(_ *Server, args []string) error { + return s.Run(args) + } + return &hks } diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index 59620345266..3e7948f6c6a 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -28,12 +28,12 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/record" - client "k8s.io/kubernetes/pkg/client/unversioned" + kubeclient "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api" "k8s.io/kubernetes/pkg/kubelet/qos" "k8s.io/kubernetes/pkg/proxy" - "k8s.io/kubernetes/pkg/proxy/config" + proxyconfig "k8s.io/kubernetes/pkg/proxy/config" "k8s.io/kubernetes/pkg/proxy/iptables" "k8s.io/kubernetes/pkg/proxy/userspace" "k8s.io/kubernetes/pkg/types" @@ -48,8 +48,8 @@ import ( "github.com/spf13/pflag" ) -// ProxyServer contains configures and runs a Kubernetes proxy server -type ProxyServer struct { +// ProxyServerConfig contains configures and runs a Kubernetes proxy server +type ProxyServerConfig struct { BindAddress net.IP HealthzPort int HealthzBindAddress net.IP @@ -58,7 +58,6 @@ type ProxyServer struct { Master string Kubeconfig string PortRange util.PortRange - Recorder record.EventRecorder HostnameOverride string ProxyMode string SyncPeriod time.Duration @@ -67,20 +66,20 @@ type ProxyServer struct { CleanupAndExit bool } -// NewProxyServer creates a new ProxyServer object with default parameters -func NewProxyServer() *ProxyServer { - return &ProxyServer{ - BindAddress: net.ParseIP("0.0.0.0"), - HealthzPort: 10249, - HealthzBindAddress: net.ParseIP("127.0.0.1"), - OOMScoreAdj: qos.KubeProxyOomScoreAdj, - ResourceContainer: "/kube-proxy", - SyncPeriod: 5 * time.Second, - } +type ProxyServer struct { + Client *kubeclient.Client + Config *ProxyServerConfig + EndpointsConfig *proxyconfig.EndpointsConfig + EndpointsHandler proxyconfig.EndpointsConfigHandler + IptInterface utiliptables.Interface + OomAdjuster *oom.OomAdjuster + Proxier proxy.ProxyProvider + Recorder record.EventRecorder + ServiceConfig *proxyconfig.ServiceConfig } // AddFlags adds flags for a specific ProxyServer to the specified FlagSet -func (s *ProxyServer) AddFlags(fs *pflag.FlagSet) { +func (s *ProxyServerConfig) AddFlags(fs *pflag.FlagSet) { fs.IPVar(&s.BindAddress, "bind-address", s.BindAddress, "The IP address for the proxy server to serve on (set to 0.0.0.0 for all interfaces)") fs.StringVar(&s.Master, "master", s.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)") fs.IntVar(&s.HealthzPort, "healthz-port", s.HealthzPort, "The port to bind the health check server. Use 0 to disable.") @@ -110,87 +109,110 @@ func checkKnownProxyMode(proxyMode string) bool { return false } -// Run runs the specified ProxyServer. This should never exit (unless CleanupAndExit is set). -func (s *ProxyServer) Run(_ []string) error { +func NewProxyConfig() *ProxyServerConfig { + return &ProxyServerConfig{ + BindAddress: net.ParseIP("0.0.0.0"), + HealthzPort: 10249, + HealthzBindAddress: net.ParseIP("127.0.0.1"), + OOMScoreAdj: qos.KubeProxyOomScoreAdj, + ResourceContainer: "/kube-proxy", + SyncPeriod: 5 * time.Second, + } +} + +func NewProxyServer( + config *ProxyServerConfig, + client *kubeclient.Client, + endpointsConfig *proxyconfig.EndpointsConfig, + endpointsHandler proxyconfig.EndpointsConfigHandler, + iptInterface utiliptables.Interface, + oomAdjuster *oom.OomAdjuster, + proxier proxy.ProxyProvider, + recorder record.EventRecorder, + serviceConfig *proxyconfig.ServiceConfig, +) (*ProxyServer, error) { + return &ProxyServer{ + Client: client, + Config: config, + EndpointsConfig: endpointsConfig, + EndpointsHandler: endpointsHandler, + IptInterface: iptInterface, + OomAdjuster: oomAdjuster, + Proxier: proxier, + Recorder: recorder, + ServiceConfig: serviceConfig, + }, nil +} + +// NewProxyServerDefault creates a new ProxyServer object with default parameters. +func NewProxyServerDefault(config *ProxyServerConfig) (*ProxyServer, error) { protocol := utiliptables.ProtocolIpv4 - if s.BindAddress.To4() == nil { + if config.BindAddress.To4() == nil { protocol = utiliptables.ProtocolIpv6 } - // remove iptables rules and exit - if s.CleanupAndExit { + // We ommit creation of pretty much everything if we run in cleanup mode + if config.CleanupAndExit { execer := exec.New() dbus := utildbus.New() - ipt := utiliptables.New(execer, dbus, protocol) - encounteredError := userspace.CleanupLeftovers(ipt) - encounteredError = iptables.CleanupLeftovers(ipt) || encounteredError - if encounteredError { - return errors.New("Encountered an error while tearing down rules.") - } - return nil + IptInterface := utiliptables.New(execer, dbus, protocol) + return &ProxyServer{ + IptInterface: IptInterface, + }, nil } // TODO(vmarmol): Use container config for this. - oomAdjuster := oom.NewOomAdjuster() - if err := oomAdjuster.ApplyOomScoreAdj(0, s.OOMScoreAdj); err != nil { - glog.V(2).Info(err) + var oomAdjuster *oom.OomAdjuster + if config.OOMScoreAdj != 0 { + oomAdjuster := oom.NewOomAdjuster() + if err := oomAdjuster.ApplyOomScoreAdj(0, config.OOMScoreAdj); err != nil { + glog.V(2).Info(err) + } } - // Run in its own container. - if err := util.RunInResourceContainer(s.ResourceContainer); err != nil { - glog.Warningf("Failed to start in resource-only container %q: %v", s.ResourceContainer, err) - } else { - glog.V(2).Infof("Running in resource-only container %q", s.ResourceContainer) + if config.ResourceContainer != "" { + // Run in its own container. + if err := util.RunInResourceContainer(config.ResourceContainer); err != nil { + glog.Warningf("Failed to start in resource-only container %q: %v", config.ResourceContainer, err) + } else { + glog.V(2).Infof("Running in resource-only container %q", config.ResourceContainer) + } } + // Create a Kube Client // define api config source - if s.Kubeconfig == "" && s.Master == "" { + if config.Kubeconfig == "" && config.Master == "" { glog.Warningf("Neither --kubeconfig nor --master was specified. Using default API client. This might not work.") } - // This creates a client, first loading any specified kubeconfig // file, and then overriding the Master flag, if non-empty. kubeconfig, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig( - &clientcmd.ClientConfigLoadingRules{ExplicitPath: s.Kubeconfig}, - &clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: s.Master}}).ClientConfig() + &clientcmd.ClientConfigLoadingRules{ExplicitPath: config.Kubeconfig}, + &clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: config.Master}}).ClientConfig() if err != nil { - return err + return nil, err } - - client, err := client.New(kubeconfig) + client, err := kubeclient.New(kubeconfig) if err != nil { glog.Fatalf("Invalid API configuration: %v", err) } - // Add event recorder - hostname := nodeutil.GetHostname(s.HostnameOverride) + // Create event recorder + hostname := nodeutil.GetHostname(config.HostnameOverride) eventBroadcaster := record.NewBroadcaster() - s.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: "kube-proxy", Host: hostname}) + recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "kube-proxy", Host: hostname}) eventBroadcaster.StartRecordingToSink(client.Events("")) - s.nodeRef = &api.ObjectReference{ - Kind: "Node", - Name: hostname, - UID: types.UID(hostname), - Namespace: "", - } - - serviceConfig := config.NewServiceConfig() - endpointsConfig := config.NewEndpointsConfig() - - var proxier proxy.ProxyProvider - var endpointsHandler config.EndpointsConfigHandler - + // Create a iptables utils. execer := exec.New() dbus := utildbus.New() - ipt := utiliptables.New(execer, dbus, protocol) + iptInterface := utiliptables.New(execer, dbus, protocol) - if !checkKnownProxyMode(s.ProxyMode) { - glog.Fatalf("Unknown proxy-mode flag: %s", s.ProxyMode) - } + var proxier proxy.ProxyProvider + var endpointsHandler proxyconfig.EndpointsConfigHandler useIptablesProxy := false - if mayTryIptablesProxy(s.ProxyMode, client.Nodes(), hostname) { + if mayTryIptablesProxy(config.ProxyMode, client.Nodes(), hostname) { var err error // guaranteed false on error, error only necessary for debugging useIptablesProxy, err = iptables.ShouldUseIptablesProxier() @@ -201,8 +223,8 @@ func (s *ProxyServer) Run(_ []string) error { if useIptablesProxy { glog.V(2).Info("Using iptables Proxier.") - - proxierIptables, err := iptables.NewProxier(ipt, execer, s.SyncPeriod, s.MasqueradeAll) + execer := exec.New() + proxierIptables, err := iptables.NewProxier(iptInterface, execer, config.SyncPeriod, config.MasqueradeAll) if err != nil { glog.Fatalf("Unable to create proxier: %v", err) } @@ -210,7 +232,7 @@ func (s *ProxyServer) Run(_ []string) error { endpointsHandler = proxierIptables // No turning back. Remove artifacts that might still exist from the userspace Proxier. glog.V(2).Info("Tearing down userspace rules. Errors here are acceptable.") - userspace.CleanupLeftovers(ipt) + userspace.CleanupLeftovers(iptInterface) } else { glog.V(2).Info("Using userspace Proxier.") // This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for @@ -219,48 +241,71 @@ func (s *ProxyServer) Run(_ []string) error { // set EndpointsConfigHandler to our loadBalancer endpointsHandler = loadBalancer - proxierUserspace, err := userspace.NewProxier(loadBalancer, s.BindAddress, ipt, s.PortRange, s.SyncPeriod) + proxierUserspace, err := userspace.NewProxier(loadBalancer, config.BindAddress, iptInterface, config.PortRange, config.SyncPeriod) if err != nil { glog.Fatalf("Unable to create proxier: %v", err) } proxier = proxierUserspace // Remove artifacts from the pure-iptables Proxier. glog.V(2).Info("Tearing down pure-iptables proxy rules. Errors here are acceptable.") - iptables.CleanupLeftovers(ipt) + iptables.CleanupLeftovers(iptInterface) } + iptInterface.AddReloadFunc(proxier.Sync) - // Birth Cry after the birth is successful - s.birthCry() - - // Wire proxier to handle changes to services - serviceConfig.RegisterHandler(proxier) - // And wire endpointsHandler to handle changes to endpoints to services - endpointsConfig.RegisterHandler(endpointsHandler) - + // Create configs (i.e. Watches for Services and Endpoints) // Note: RegisterHandler() calls need to happen before creation of Sources because sources // only notify on changes, and the initial update (on process start) may be lost if no handlers // are registered yet. + serviceConfig := proxyconfig.NewServiceConfig() + serviceConfig.RegisterHandler(proxier) - config.NewSourceAPI( + endpointsConfig := proxyconfig.NewEndpointsConfig() + endpointsConfig.RegisterHandler(endpointsHandler) + + proxyconfig.NewSourceAPI( client, 30*time.Second, serviceConfig.Channel("api"), endpointsConfig.Channel("api"), ) - if s.HealthzPort > 0 { + config.nodeRef = &api.ObjectReference{ + Kind: "Node", + Name: hostname, + UID: types.UID(hostname), + Namespace: "", + } + + return NewProxyServer(config, client, endpointsConfig, endpointsHandler, iptInterface, oomAdjuster, proxier, recorder, serviceConfig) +} + +// Run runs the specified ProxyServer. This should never exit (unless CleanupAndExit is set). +func (s *ProxyServer) Run(_ []string) error { + // remove iptables rules and exit + if s.Config.CleanupAndExit { + encounteredError := userspace.CleanupLeftovers(s.IptInterface) + encounteredError = iptables.CleanupLeftovers(s.IptInterface) || encounteredError + if encounteredError { + return errors.New("Encountered an error while tearing down rules.") + } + return nil + } + + // Birth Cry after the birth is successful + s.birthCry() + + // Start up Healthz service if requested + if s.Config.HealthzPort > 0 { go util.Until(func() { - err := http.ListenAndServe(s.HealthzBindAddress.String()+":"+strconv.Itoa(s.HealthzPort), nil) + err := http.ListenAndServe(s.Config.HealthzBindAddress.String()+":"+strconv.Itoa(s.Config.HealthzPort), nil) if err != nil { glog.Errorf("Starting health server failed: %v", err) } }, 5*time.Second, util.NeverStop) } - ipt.AddReloadFunc(proxier.Sync) - // Just loop forever for now... - proxier.SyncLoop() + s.Proxier.SyncLoop() return nil } @@ -303,5 +348,5 @@ func mayTryIptablesProxy(proxyMode string, client nodeGetter, hostname string) b } func (s *ProxyServer) birthCry() { - s.Recorder.Eventf(s.nodeRef, "Starting", "Starting kube-proxy.") + s.Recorder.Eventf(s.Config.nodeRef, "Starting", "Starting kube-proxy.") } diff --git a/cmd/kube-proxy/proxy.go b/cmd/kube-proxy/proxy.go index fbcb1f611fa..85e0cee492e 100644 --- a/cmd/kube-proxy/proxy.go +++ b/cmd/kube-proxy/proxy.go @@ -35,8 +35,8 @@ func init() { func main() { runtime.GOMAXPROCS(runtime.NumCPU()) - s := app.NewProxyServer() - s.AddFlags(pflag.CommandLine) + config := app.NewProxyConfig() + config.AddFlags(pflag.CommandLine) util.InitFlags() util.InitLogs() @@ -44,7 +44,13 @@ func main() { verflag.PrintAndExitIfRequested() - if err := s.Run(pflag.CommandLine.Args()); err != nil { + s, err := app.NewProxyServerDefault(config) + if err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + os.Exit(1) + } + + if err = s.Run(pflag.CommandLine.Args()); err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) os.Exit(1) } diff --git a/contrib/mesos/cmd/km/kube-proxy.go b/contrib/mesos/cmd/km/kube-proxy.go index d410441d6c5..eebe1f6e5a9 100644 --- a/contrib/mesos/cmd/km/kube-proxy.go +++ b/contrib/mesos/cmd/km/kube-proxy.go @@ -18,25 +18,37 @@ limitations under the License. package main import ( + "fmt" + "os" + kubeproxy "k8s.io/kubernetes/cmd/kube-proxy/app" "k8s.io/kubernetes/contrib/mesos/pkg/hyperkube" ) // NewKubeProxy creates a new hyperkube Server object that includes the // description and flags. + func NewKubeProxy() *Server { - s := kubeproxy.NewProxyServer() + config := kubeproxy.NewProxyConfig() hks := Server{ SimpleUsage: hyperkube.CommandProxy, Long: `The Kubernetes proxy server is responsible for taking traffic directed at - services and forwarding it to the appropriate pods. It generally runs on + services and forwarding it to the appropriate pods. It generally runs on nodes next to the Kubelet and proxies traffic from local pods to remote pods. It is also used when handling incoming external traffic.`, - Run: func(_ *Server, args []string) error { - return s.Run(args) - }, } - s.AddFlags(hks.Flags()) + + config.AddFlags(hks.Flags()) + s, err := kubeproxy.NewProxyServerDefault(config) + if err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + os.Exit(1) + } + + hks.Run = func(_ *Server, args []string) error { + return s.Run(args) + } + return &hks }