Refactor KubeProxy to allow mocking of all moving parts.

This commit is contained in:
gmarek 2015-09-21 12:05:22 +02:00
parent 45a8b5f98a
commit 1c25c2cd99
4 changed files with 173 additions and 99 deletions

View File

@ -19,24 +19,35 @@ limitations under the License.
package main package main
import ( import (
"fmt"
"os"
kubeproxy "k8s.io/kubernetes/cmd/kube-proxy/app" kubeproxy "k8s.io/kubernetes/cmd/kube-proxy/app"
) )
// NewKubeProxy creates a new hyperkube Server object that includes the // NewKubeProxy creates a new hyperkube Server object that includes the
// description and flags. // description and flags.
func NewKubeProxy() *Server { func NewKubeProxy() *Server {
s := kubeproxy.NewProxyServer() config := kubeproxy.NewProxyConfig()
hks := Server{ hks := Server{
SimpleUsage: "proxy", SimpleUsage: "proxy",
Long: `The Kubernetes proxy server is responsible for taking traffic directed at Long: `The Kubernetes proxy server is responsible for taking traffic directed at
services and forwarding it to the appropriate pods. It generally runs on services and forwarding it to the appropriate pods. It generally runs on
nodes next to the Kubelet and proxies traffic from local pods to remote pods. nodes next to the Kubelet and proxies traffic from local pods to remote pods.
It is also used when handling incoming external traffic.`, It is also used when handling incoming external traffic.`,
Run: func(_ *Server, args []string) error {
return s.Run(args)
},
} }
s.AddFlags(hks.Flags())
config.AddFlags(hks.Flags())
s, err := kubeproxy.NewProxyServerDefault(config)
if err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
hks.Run = func(_ *Server, args []string) error {
return s.Run(args)
}
return &hks return &hks
} }

View File

@ -28,12 +28,12 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned" kubeclient "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd" "k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api" clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api"
"k8s.io/kubernetes/pkg/kubelet/qos" "k8s.io/kubernetes/pkg/kubelet/qos"
"k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/config" proxyconfig "k8s.io/kubernetes/pkg/proxy/config"
"k8s.io/kubernetes/pkg/proxy/iptables" "k8s.io/kubernetes/pkg/proxy/iptables"
"k8s.io/kubernetes/pkg/proxy/userspace" "k8s.io/kubernetes/pkg/proxy/userspace"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
@ -48,8 +48,8 @@ import (
"github.com/spf13/pflag" "github.com/spf13/pflag"
) )
// ProxyServer contains configures and runs a Kubernetes proxy server // ProxyServerConfig contains configures and runs a Kubernetes proxy server
type ProxyServer struct { type ProxyServerConfig struct {
BindAddress net.IP BindAddress net.IP
HealthzPort int HealthzPort int
HealthzBindAddress net.IP HealthzBindAddress net.IP
@ -58,7 +58,6 @@ type ProxyServer struct {
Master string Master string
Kubeconfig string Kubeconfig string
PortRange util.PortRange PortRange util.PortRange
Recorder record.EventRecorder
HostnameOverride string HostnameOverride string
ProxyMode string ProxyMode string
SyncPeriod time.Duration SyncPeriod time.Duration
@ -67,20 +66,20 @@ type ProxyServer struct {
CleanupAndExit bool CleanupAndExit bool
} }
// NewProxyServer creates a new ProxyServer object with default parameters type ProxyServer struct {
func NewProxyServer() *ProxyServer { Client *kubeclient.Client
return &ProxyServer{ Config *ProxyServerConfig
BindAddress: net.ParseIP("0.0.0.0"), EndpointsConfig *proxyconfig.EndpointsConfig
HealthzPort: 10249, EndpointsHandler proxyconfig.EndpointsConfigHandler
HealthzBindAddress: net.ParseIP("127.0.0.1"), IptInterface utiliptables.Interface
OOMScoreAdj: qos.KubeProxyOomScoreAdj, OomAdjuster *oom.OomAdjuster
ResourceContainer: "/kube-proxy", Proxier proxy.ProxyProvider
SyncPeriod: 5 * time.Second, Recorder record.EventRecorder
} ServiceConfig *proxyconfig.ServiceConfig
} }
// AddFlags adds flags for a specific ProxyServer to the specified FlagSet // AddFlags adds flags for a specific ProxyServer to the specified FlagSet
func (s *ProxyServer) AddFlags(fs *pflag.FlagSet) { func (s *ProxyServerConfig) AddFlags(fs *pflag.FlagSet) {
fs.IPVar(&s.BindAddress, "bind-address", s.BindAddress, "The IP address for the proxy server to serve on (set to 0.0.0.0 for all interfaces)") fs.IPVar(&s.BindAddress, "bind-address", s.BindAddress, "The IP address for the proxy server to serve on (set to 0.0.0.0 for all interfaces)")
fs.StringVar(&s.Master, "master", s.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)") fs.StringVar(&s.Master, "master", s.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
fs.IntVar(&s.HealthzPort, "healthz-port", s.HealthzPort, "The port to bind the health check server. Use 0 to disable.") fs.IntVar(&s.HealthzPort, "healthz-port", s.HealthzPort, "The port to bind the health check server. Use 0 to disable.")
@ -110,87 +109,110 @@ func checkKnownProxyMode(proxyMode string) bool {
return false return false
} }
// Run runs the specified ProxyServer. This should never exit (unless CleanupAndExit is set). func NewProxyConfig() *ProxyServerConfig {
func (s *ProxyServer) Run(_ []string) error { return &ProxyServerConfig{
BindAddress: net.ParseIP("0.0.0.0"),
HealthzPort: 10249,
HealthzBindAddress: net.ParseIP("127.0.0.1"),
OOMScoreAdj: qos.KubeProxyOomScoreAdj,
ResourceContainer: "/kube-proxy",
SyncPeriod: 5 * time.Second,
}
}
func NewProxyServer(
config *ProxyServerConfig,
client *kubeclient.Client,
endpointsConfig *proxyconfig.EndpointsConfig,
endpointsHandler proxyconfig.EndpointsConfigHandler,
iptInterface utiliptables.Interface,
oomAdjuster *oom.OomAdjuster,
proxier proxy.ProxyProvider,
recorder record.EventRecorder,
serviceConfig *proxyconfig.ServiceConfig,
) (*ProxyServer, error) {
return &ProxyServer{
Client: client,
Config: config,
EndpointsConfig: endpointsConfig,
EndpointsHandler: endpointsHandler,
IptInterface: iptInterface,
OomAdjuster: oomAdjuster,
Proxier: proxier,
Recorder: recorder,
ServiceConfig: serviceConfig,
}, nil
}
// NewProxyServerDefault creates a new ProxyServer object with default parameters.
func NewProxyServerDefault(config *ProxyServerConfig) (*ProxyServer, error) {
protocol := utiliptables.ProtocolIpv4 protocol := utiliptables.ProtocolIpv4
if s.BindAddress.To4() == nil { if config.BindAddress.To4() == nil {
protocol = utiliptables.ProtocolIpv6 protocol = utiliptables.ProtocolIpv6
} }
// remove iptables rules and exit // We ommit creation of pretty much everything if we run in cleanup mode
if s.CleanupAndExit { if config.CleanupAndExit {
execer := exec.New() execer := exec.New()
dbus := utildbus.New() dbus := utildbus.New()
ipt := utiliptables.New(execer, dbus, protocol) IptInterface := utiliptables.New(execer, dbus, protocol)
encounteredError := userspace.CleanupLeftovers(ipt) return &ProxyServer{
encounteredError = iptables.CleanupLeftovers(ipt) || encounteredError IptInterface: IptInterface,
if encounteredError { }, nil
return errors.New("Encountered an error while tearing down rules.")
}
return nil
} }
// TODO(vmarmol): Use container config for this. // TODO(vmarmol): Use container config for this.
oomAdjuster := oom.NewOomAdjuster() var oomAdjuster *oom.OomAdjuster
if err := oomAdjuster.ApplyOomScoreAdj(0, s.OOMScoreAdj); err != nil { if config.OOMScoreAdj != 0 {
glog.V(2).Info(err) oomAdjuster := oom.NewOomAdjuster()
if err := oomAdjuster.ApplyOomScoreAdj(0, config.OOMScoreAdj); err != nil {
glog.V(2).Info(err)
}
} }
// Run in its own container. if config.ResourceContainer != "" {
if err := util.RunInResourceContainer(s.ResourceContainer); err != nil { // Run in its own container.
glog.Warningf("Failed to start in resource-only container %q: %v", s.ResourceContainer, err) if err := util.RunInResourceContainer(config.ResourceContainer); err != nil {
} else { glog.Warningf("Failed to start in resource-only container %q: %v", config.ResourceContainer, err)
glog.V(2).Infof("Running in resource-only container %q", s.ResourceContainer) } else {
glog.V(2).Infof("Running in resource-only container %q", config.ResourceContainer)
}
} }
// Create a Kube Client
// define api config source // define api config source
if s.Kubeconfig == "" && s.Master == "" { if config.Kubeconfig == "" && config.Master == "" {
glog.Warningf("Neither --kubeconfig nor --master was specified. Using default API client. This might not work.") glog.Warningf("Neither --kubeconfig nor --master was specified. Using default API client. This might not work.")
} }
// This creates a client, first loading any specified kubeconfig // This creates a client, first loading any specified kubeconfig
// file, and then overriding the Master flag, if non-empty. // file, and then overriding the Master flag, if non-empty.
kubeconfig, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig( kubeconfig, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
&clientcmd.ClientConfigLoadingRules{ExplicitPath: s.Kubeconfig}, &clientcmd.ClientConfigLoadingRules{ExplicitPath: config.Kubeconfig},
&clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: s.Master}}).ClientConfig() &clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: config.Master}}).ClientConfig()
if err != nil { if err != nil {
return err return nil, err
} }
client, err := kubeclient.New(kubeconfig)
client, err := client.New(kubeconfig)
if err != nil { if err != nil {
glog.Fatalf("Invalid API configuration: %v", err) glog.Fatalf("Invalid API configuration: %v", err)
} }
// Add event recorder // Create event recorder
hostname := nodeutil.GetHostname(s.HostnameOverride) hostname := nodeutil.GetHostname(config.HostnameOverride)
eventBroadcaster := record.NewBroadcaster() eventBroadcaster := record.NewBroadcaster()
s.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: "kube-proxy", Host: hostname}) recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "kube-proxy", Host: hostname})
eventBroadcaster.StartRecordingToSink(client.Events("")) eventBroadcaster.StartRecordingToSink(client.Events(""))
s.nodeRef = &api.ObjectReference{ // Create a iptables utils.
Kind: "Node",
Name: hostname,
UID: types.UID(hostname),
Namespace: "",
}
serviceConfig := config.NewServiceConfig()
endpointsConfig := config.NewEndpointsConfig()
var proxier proxy.ProxyProvider
var endpointsHandler config.EndpointsConfigHandler
execer := exec.New() execer := exec.New()
dbus := utildbus.New() dbus := utildbus.New()
ipt := utiliptables.New(execer, dbus, protocol) iptInterface := utiliptables.New(execer, dbus, protocol)
if !checkKnownProxyMode(s.ProxyMode) { var proxier proxy.ProxyProvider
glog.Fatalf("Unknown proxy-mode flag: %s", s.ProxyMode) var endpointsHandler proxyconfig.EndpointsConfigHandler
}
useIptablesProxy := false useIptablesProxy := false
if mayTryIptablesProxy(s.ProxyMode, client.Nodes(), hostname) { if mayTryIptablesProxy(config.ProxyMode, client.Nodes(), hostname) {
var err error var err error
// guaranteed false on error, error only necessary for debugging // guaranteed false on error, error only necessary for debugging
useIptablesProxy, err = iptables.ShouldUseIptablesProxier() useIptablesProxy, err = iptables.ShouldUseIptablesProxier()
@ -201,8 +223,8 @@ func (s *ProxyServer) Run(_ []string) error {
if useIptablesProxy { if useIptablesProxy {
glog.V(2).Info("Using iptables Proxier.") glog.V(2).Info("Using iptables Proxier.")
execer := exec.New()
proxierIptables, err := iptables.NewProxier(ipt, execer, s.SyncPeriod, s.MasqueradeAll) proxierIptables, err := iptables.NewProxier(iptInterface, execer, config.SyncPeriod, config.MasqueradeAll)
if err != nil { if err != nil {
glog.Fatalf("Unable to create proxier: %v", err) glog.Fatalf("Unable to create proxier: %v", err)
} }
@ -210,7 +232,7 @@ func (s *ProxyServer) Run(_ []string) error {
endpointsHandler = proxierIptables endpointsHandler = proxierIptables
// No turning back. Remove artifacts that might still exist from the userspace Proxier. // No turning back. Remove artifacts that might still exist from the userspace Proxier.
glog.V(2).Info("Tearing down userspace rules. Errors here are acceptable.") glog.V(2).Info("Tearing down userspace rules. Errors here are acceptable.")
userspace.CleanupLeftovers(ipt) userspace.CleanupLeftovers(iptInterface)
} else { } else {
glog.V(2).Info("Using userspace Proxier.") glog.V(2).Info("Using userspace Proxier.")
// This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for // This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for
@ -219,48 +241,71 @@ func (s *ProxyServer) Run(_ []string) error {
// set EndpointsConfigHandler to our loadBalancer // set EndpointsConfigHandler to our loadBalancer
endpointsHandler = loadBalancer endpointsHandler = loadBalancer
proxierUserspace, err := userspace.NewProxier(loadBalancer, s.BindAddress, ipt, s.PortRange, s.SyncPeriod) proxierUserspace, err := userspace.NewProxier(loadBalancer, config.BindAddress, iptInterface, config.PortRange, config.SyncPeriod)
if err != nil { if err != nil {
glog.Fatalf("Unable to create proxier: %v", err) glog.Fatalf("Unable to create proxier: %v", err)
} }
proxier = proxierUserspace proxier = proxierUserspace
// Remove artifacts from the pure-iptables Proxier. // Remove artifacts from the pure-iptables Proxier.
glog.V(2).Info("Tearing down pure-iptables proxy rules. Errors here are acceptable.") glog.V(2).Info("Tearing down pure-iptables proxy rules. Errors here are acceptable.")
iptables.CleanupLeftovers(ipt) iptables.CleanupLeftovers(iptInterface)
} }
iptInterface.AddReloadFunc(proxier.Sync)
// Birth Cry after the birth is successful // Create configs (i.e. Watches for Services and Endpoints)
s.birthCry()
// Wire proxier to handle changes to services
serviceConfig.RegisterHandler(proxier)
// And wire endpointsHandler to handle changes to endpoints to services
endpointsConfig.RegisterHandler(endpointsHandler)
// Note: RegisterHandler() calls need to happen before creation of Sources because sources // Note: RegisterHandler() calls need to happen before creation of Sources because sources
// only notify on changes, and the initial update (on process start) may be lost if no handlers // only notify on changes, and the initial update (on process start) may be lost if no handlers
// are registered yet. // are registered yet.
serviceConfig := proxyconfig.NewServiceConfig()
serviceConfig.RegisterHandler(proxier)
config.NewSourceAPI( endpointsConfig := proxyconfig.NewEndpointsConfig()
endpointsConfig.RegisterHandler(endpointsHandler)
proxyconfig.NewSourceAPI(
client, client,
30*time.Second, 30*time.Second,
serviceConfig.Channel("api"), serviceConfig.Channel("api"),
endpointsConfig.Channel("api"), endpointsConfig.Channel("api"),
) )
if s.HealthzPort > 0 { config.nodeRef = &api.ObjectReference{
Kind: "Node",
Name: hostname,
UID: types.UID(hostname),
Namespace: "",
}
return NewProxyServer(config, client, endpointsConfig, endpointsHandler, iptInterface, oomAdjuster, proxier, recorder, serviceConfig)
}
// Run runs the specified ProxyServer. This should never exit (unless CleanupAndExit is set).
func (s *ProxyServer) Run(_ []string) error {
// remove iptables rules and exit
if s.Config.CleanupAndExit {
encounteredError := userspace.CleanupLeftovers(s.IptInterface)
encounteredError = iptables.CleanupLeftovers(s.IptInterface) || encounteredError
if encounteredError {
return errors.New("Encountered an error while tearing down rules.")
}
return nil
}
// Birth Cry after the birth is successful
s.birthCry()
// Start up Healthz service if requested
if s.Config.HealthzPort > 0 {
go util.Until(func() { go util.Until(func() {
err := http.ListenAndServe(s.HealthzBindAddress.String()+":"+strconv.Itoa(s.HealthzPort), nil) err := http.ListenAndServe(s.Config.HealthzBindAddress.String()+":"+strconv.Itoa(s.Config.HealthzPort), nil)
if err != nil { if err != nil {
glog.Errorf("Starting health server failed: %v", err) glog.Errorf("Starting health server failed: %v", err)
} }
}, 5*time.Second, util.NeverStop) }, 5*time.Second, util.NeverStop)
} }
ipt.AddReloadFunc(proxier.Sync)
// Just loop forever for now... // Just loop forever for now...
proxier.SyncLoop() s.Proxier.SyncLoop()
return nil return nil
} }
@ -303,5 +348,5 @@ func mayTryIptablesProxy(proxyMode string, client nodeGetter, hostname string) b
} }
func (s *ProxyServer) birthCry() { func (s *ProxyServer) birthCry() {
s.Recorder.Eventf(s.nodeRef, "Starting", "Starting kube-proxy.") s.Recorder.Eventf(s.Config.nodeRef, "Starting", "Starting kube-proxy.")
} }

View File

@ -35,8 +35,8 @@ func init() {
func main() { func main() {
runtime.GOMAXPROCS(runtime.NumCPU()) runtime.GOMAXPROCS(runtime.NumCPU())
s := app.NewProxyServer() config := app.NewProxyConfig()
s.AddFlags(pflag.CommandLine) config.AddFlags(pflag.CommandLine)
util.InitFlags() util.InitFlags()
util.InitLogs() util.InitLogs()
@ -44,7 +44,13 @@ func main() {
verflag.PrintAndExitIfRequested() verflag.PrintAndExitIfRequested()
if err := s.Run(pflag.CommandLine.Args()); err != nil { s, err := app.NewProxyServerDefault(config)
if err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
if err = s.Run(pflag.CommandLine.Args()); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err) fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1) os.Exit(1)
} }

View File

@ -18,25 +18,37 @@ limitations under the License.
package main package main
import ( import (
"fmt"
"os"
kubeproxy "k8s.io/kubernetes/cmd/kube-proxy/app" kubeproxy "k8s.io/kubernetes/cmd/kube-proxy/app"
"k8s.io/kubernetes/contrib/mesos/pkg/hyperkube" "k8s.io/kubernetes/contrib/mesos/pkg/hyperkube"
) )
// NewKubeProxy creates a new hyperkube Server object that includes the // NewKubeProxy creates a new hyperkube Server object that includes the
// description and flags. // description and flags.
func NewKubeProxy() *Server { func NewKubeProxy() *Server {
s := kubeproxy.NewProxyServer() config := kubeproxy.NewProxyConfig()
hks := Server{ hks := Server{
SimpleUsage: hyperkube.CommandProxy, SimpleUsage: hyperkube.CommandProxy,
Long: `The Kubernetes proxy server is responsible for taking traffic directed at Long: `The Kubernetes proxy server is responsible for taking traffic directed at
services and forwarding it to the appropriate pods. It generally runs on services and forwarding it to the appropriate pods. It generally runs on
nodes next to the Kubelet and proxies traffic from local pods to remote pods. nodes next to the Kubelet and proxies traffic from local pods to remote pods.
It is also used when handling incoming external traffic.`, It is also used when handling incoming external traffic.`,
Run: func(_ *Server, args []string) error {
return s.Run(args)
},
} }
s.AddFlags(hks.Flags())
config.AddFlags(hks.Flags())
s, err := kubeproxy.NewProxyServerDefault(config)
if err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
hks.Run = func(_ *Server, args []string) error {
return s.Run(args)
}
return &hks return &hks
} }