diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index c91f93244fc..e1a4b7d015c 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -501,8 +501,6 @@ type ProxyServer struct { OOMScoreAdj *int32 ResourceContainer string ConfigSyncPeriod time.Duration - ServiceEventHandler config.ServiceHandler - EndpointsEventHandler config.EndpointsHandler HealthzServer *healthcheck.HealthzServer } @@ -657,11 +655,11 @@ func (s *ProxyServer) Run() error { // only notify on changes, and the initial update (on process start) may be lost if no handlers // are registered yet. serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.ConfigSyncPeriod) - serviceConfig.RegisterEventHandler(s.ServiceEventHandler) + serviceConfig.RegisterEventHandler(s.Proxier) go serviceConfig.Run(wait.NeverStop) endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod) - endpointsConfig.RegisterEventHandler(s.EndpointsEventHandler) + endpointsConfig.RegisterEventHandler(s.Proxier) go endpointsConfig.Run(wait.NeverStop) // This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those diff --git a/cmd/kube-proxy/app/server_others.go b/cmd/kube-proxy/app/server_others.go index 84d5ea0d07d..0f0832c9bc4 100644 --- a/cmd/kube-proxy/app/server_others.go +++ b/cmd/kube-proxy/app/server_others.go @@ -33,7 +33,6 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/proxy" proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config" - proxyconfig "k8s.io/kubernetes/pkg/proxy/config" "k8s.io/kubernetes/pkg/proxy/healthcheck" "k8s.io/kubernetes/pkg/proxy/iptables" "k8s.io/kubernetes/pkg/proxy/ipvs" @@ -135,8 +134,6 @@ func newProxyServer( } var proxier proxy.ProxyProvider - var serviceEventHandler proxyconfig.ServiceHandler - var endpointsEventHandler proxyconfig.EndpointsHandler proxyMode := getProxyMode(string(config.Mode), iptInterface, kernelHandler, ipsetInterface, iptables.LinuxKernelCompatTester{}) nodeIP := net.ParseIP(config.BindAddress) @@ -151,7 +148,7 @@ func newProxyServer( } // TODO this has side effects that should only happen when Run() is invoked. - proxierIPTables, err := iptables.NewProxier( + proxier, err = iptables.NewProxier( iptInterface, utilsysctl.New(), execer, @@ -170,12 +167,9 @@ func newProxyServer( return nil, fmt.Errorf("unable to create proxier: %v", err) } metrics.RegisterMetrics() - proxier = proxierIPTables - serviceEventHandler = proxierIPTables - endpointsEventHandler = proxierIPTables } else if proxyMode == proxyModeIPVS { klog.V(0).Info("Using ipvs Proxier.") - proxierIPVS, err := ipvs.NewProxier( + proxier, err = ipvs.NewProxier( iptInterface, ipvsInterface, ipsetInterface, @@ -199,20 +193,12 @@ func newProxyServer( return nil, fmt.Errorf("unable to create proxier: %v", err) } metrics.RegisterMetrics() - proxier = proxierIPVS - serviceEventHandler = proxierIPVS - endpointsEventHandler = proxierIPVS } else { klog.V(0).Info("Using userspace Proxier.") - // This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for - // our config.EndpointsConfigHandler. - loadBalancer := userspace.NewLoadBalancerRR() - // set EndpointsConfigHandler to our loadBalancer - endpointsEventHandler = loadBalancer // TODO this has side effects that should only happen when Run() is invoked. - proxierUserspace, err := userspace.NewProxier( - loadBalancer, + proxier, err = userspace.NewProxier( + userspace.NewLoadBalancerRR(), net.ParseIP(config.BindAddress), iptInterface, execer, @@ -225,8 +211,6 @@ func newProxyServer( if err != nil { return nil, fmt.Errorf("unable to create proxier: %v", err) } - serviceEventHandler = proxierUserspace - proxier = proxierUserspace } iptInterface.AddReloadFunc(proxier.Sync) @@ -250,8 +234,6 @@ func newProxyServer( OOMScoreAdj: config.OOMScoreAdj, ResourceContainer: config.ResourceContainer, ConfigSyncPeriod: config.ConfigSyncPeriod.Duration, - ServiceEventHandler: serviceEventHandler, - EndpointsEventHandler: endpointsEventHandler, HealthzServer: healthzServer, }, nil } diff --git a/cmd/kube-proxy/app/server_windows.go b/cmd/kube-proxy/app/server_windows.go index 596796cbbe1..97bce3828f1 100644 --- a/cmd/kube-proxy/app/server_windows.go +++ b/cmd/kube-proxy/app/server_windows.go @@ -33,7 +33,6 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/proxy" proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config" - proxyconfig "k8s.io/kubernetes/pkg/proxy/config" "k8s.io/kubernetes/pkg/proxy/healthcheck" "k8s.io/kubernetes/pkg/proxy/winkernel" "k8s.io/kubernetes/pkg/proxy/winuserspace" @@ -94,13 +93,11 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi } var proxier proxy.ProxyProvider - var serviceEventHandler proxyconfig.ServiceHandler - var endpointsEventHandler proxyconfig.EndpointsHandler proxyMode := getProxyMode(string(config.Mode), winkernel.WindowsKernelCompatTester{}) if proxyMode == proxyModeKernelspace { klog.V(0).Info("Using Kernelspace Proxier.") - proxierKernelspace, err := winkernel.NewProxier( + proxier, err = winkernel.NewProxier( config.IPTables.SyncPeriod.Duration, config.IPTables.MinSyncPeriod.Duration, config.IPTables.MasqueradeAll, @@ -115,23 +112,14 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi if err != nil { return nil, fmt.Errorf("unable to create proxier: %v", err) } - proxier = proxierKernelspace - endpointsEventHandler = proxierKernelspace - serviceEventHandler = proxierKernelspace } else { klog.V(0).Info("Using userspace Proxier.") execer := exec.New() var netshInterface utilnetsh.Interface netshInterface = utilnetsh.New(execer) - // This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for - // our config.EndpointsConfigHandler. - loadBalancer := winuserspace.NewLoadBalancerRR() - - // set EndpointsConfigHandler to our loadBalancer - endpointsEventHandler = loadBalancer - proxierUserspace, err := winuserspace.NewProxier( - loadBalancer, + proxier, err = winuserspace.NewProxier( + winuserspace.NewLoadBalancerRR(), net.ParseIP(config.BindAddress), netshInterface, *utilnet.ParsePortRangeOrDie(config.PortRange), @@ -142,26 +130,22 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi if err != nil { return nil, fmt.Errorf("unable to create proxier: %v", err) } - proxier = proxierUserspace - serviceEventHandler = proxierUserspace } return &ProxyServer{ - Client: client, - EventClient: eventClient, - Proxier: proxier, - Broadcaster: eventBroadcaster, - Recorder: recorder, - ProxyMode: proxyMode, - NodeRef: nodeRef, - MetricsBindAddress: config.MetricsBindAddress, - EnableProfiling: config.EnableProfiling, - OOMScoreAdj: config.OOMScoreAdj, - ResourceContainer: config.ResourceContainer, - ConfigSyncPeriod: config.ConfigSyncPeriod.Duration, - ServiceEventHandler: serviceEventHandler, - EndpointsEventHandler: endpointsEventHandler, - HealthzServer: healthzServer, + Client: client, + EventClient: eventClient, + Proxier: proxier, + Broadcaster: eventBroadcaster, + Recorder: recorder, + ProxyMode: proxyMode, + NodeRef: nodeRef, + MetricsBindAddress: config.MetricsBindAddress, + EnableProfiling: config.EnableProfiling, + OOMScoreAdj: config.OOMScoreAdj, + ResourceContainer: config.ResourceContainer, + ConfigSyncPeriod: config.ConfigSyncPeriod.Duration, + HealthzServer: healthzServer, }, nil } diff --git a/pkg/kubemark/BUILD b/pkg/kubemark/BUILD index ced440d7bc6..1b689f13157 100644 --- a/pkg/kubemark/BUILD +++ b/pkg/kubemark/BUILD @@ -26,7 +26,6 @@ go_library( "//pkg/kubelet/dockershim:go_default_library", "//pkg/kubelet/types:go_default_library", "//pkg/proxy:go_default_library", - "//pkg/proxy/config:go_default_library", "//pkg/proxy/iptables:go_default_library", "//pkg/util/iptables:go_default_library", "//pkg/util/mount:go_default_library", diff --git a/pkg/kubemark/hollow_proxy.go b/pkg/kubemark/hollow_proxy.go index 5e4a7ec8897..600a97b295b 100644 --- a/pkg/kubemark/hollow_proxy.go +++ b/pkg/kubemark/hollow_proxy.go @@ -27,7 +27,6 @@ import ( "k8s.io/client-go/tools/record" proxyapp "k8s.io/kubernetes/cmd/kube-proxy/app" "k8s.io/kubernetes/pkg/proxy" - proxyconfig "k8s.io/kubernetes/pkg/proxy/config" "k8s.io/kubernetes/pkg/proxy/iptables" utiliptables "k8s.io/kubernetes/pkg/util/iptables" utilnode "k8s.io/kubernetes/pkg/util/node" @@ -72,13 +71,12 @@ func NewHollowProxyOrDie( ) (*HollowProxy, error) { // Create proxier and service/endpoint handlers. var proxier proxy.ProxyProvider - var serviceHandler proxyconfig.ServiceHandler - var endpointsHandler proxyconfig.EndpointsHandler + var err error if useRealProxier { // Real proxier with fake iptables, sysctl, etc underneath it. //var err error - proxierIPTables, err := iptables.NewProxier( + proxier, err = iptables.NewProxier( iptInterface, sysctl, execer, @@ -96,13 +94,8 @@ func NewHollowProxyOrDie( if err != nil { return nil, fmt.Errorf("unable to create proxier: %v", err) } - proxier = proxierIPTables - serviceHandler = proxierIPTables - endpointsHandler = proxierIPTables } else { proxier = &FakeProxier{} - serviceHandler = &FakeProxier{} - endpointsHandler = &FakeProxier{} } // Create a Hollow Proxy instance. @@ -114,19 +107,17 @@ func NewHollowProxyOrDie( } return &HollowProxy{ ProxyServer: &proxyapp.ProxyServer{ - Client: client, - EventClient: eventClient, - IptInterface: iptInterface, - Proxier: proxier, - Broadcaster: broadcaster, - Recorder: recorder, - ProxyMode: "fake", - NodeRef: nodeRef, - OOMScoreAdj: utilpointer.Int32Ptr(0), - ResourceContainer: "", - ConfigSyncPeriod: 30 * time.Second, - ServiceEventHandler: serviceHandler, - EndpointsEventHandler: endpointsHandler, + Client: client, + EventClient: eventClient, + IptInterface: iptInterface, + Proxier: proxier, + Broadcaster: broadcaster, + Recorder: recorder, + ProxyMode: "fake", + NodeRef: nodeRef, + OOMScoreAdj: utilpointer.Int32Ptr(0), + ResourceContainer: "", + ConfigSyncPeriod: 30 * time.Second, }, }, nil } diff --git a/pkg/proxy/BUILD b/pkg/proxy/BUILD index a7969c25c86..b43afc322c3 100644 --- a/pkg/proxy/BUILD +++ b/pkg/proxy/BUILD @@ -17,6 +17,7 @@ go_library( importpath = "k8s.io/kubernetes/pkg/proxy", deps = [ "//pkg/api/v1/service:go_default_library", + "//pkg/proxy/config:go_default_library", "//pkg/proxy/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", diff --git a/pkg/proxy/types.go b/pkg/proxy/types.go index b7d8b1654a9..410603db195 100644 --- a/pkg/proxy/types.go +++ b/pkg/proxy/types.go @@ -21,10 +21,14 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/kubernetes/pkg/proxy/config" ) // ProxyProvider is the interface provided by proxier implementations. type ProxyProvider interface { + config.EndpointsHandler + config.ServiceHandler + // Sync immediately synchronizes the ProxyProvider's current state to proxy rules. Sync() // SyncLoop runs periodic work. diff --git a/pkg/proxy/userspace/BUILD b/pkg/proxy/userspace/BUILD index b7887c85b5d..b378147969f 100644 --- a/pkg/proxy/userspace/BUILD +++ b/pkg/proxy/userspace/BUILD @@ -21,6 +21,7 @@ go_library( deps = [ "//pkg/apis/core/v1/helper:go_default_library", "//pkg/proxy:go_default_library", + "//pkg/proxy/config:go_default_library", "//pkg/proxy/util:go_default_library", "//pkg/util/conntrack:go_default_library", "//pkg/util/iptables:go_default_library", diff --git a/pkg/proxy/userspace/loadbalancer.go b/pkg/proxy/userspace/loadbalancer.go index f0e5bcaa694..0b853477851 100644 --- a/pkg/proxy/userspace/loadbalancer.go +++ b/pkg/proxy/userspace/loadbalancer.go @@ -19,6 +19,7 @@ package userspace import ( "k8s.io/api/core/v1" "k8s.io/kubernetes/pkg/proxy" + proxyconfig "k8s.io/kubernetes/pkg/proxy/config" "net" ) @@ -31,4 +32,6 @@ type LoadBalancer interface { DeleteService(service proxy.ServicePortName) CleanupStaleStickySessions(service proxy.ServicePortName) ServiceHasEndpoints(service proxy.ServicePortName) bool + + proxyconfig.EndpointsHandler } diff --git a/pkg/proxy/userspace/proxier.go b/pkg/proxy/userspace/proxier.go index 661092b1b2b..52f8586b656 100644 --- a/pkg/proxy/userspace/proxier.go +++ b/pkg/proxy/userspace/proxier.go @@ -526,6 +526,22 @@ func (proxier *Proxier) OnServiceDelete(service *v1.Service) { func (proxier *Proxier) OnServiceSynced() { } +func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) { + proxier.loadBalancer.OnEndpointsAdd(endpoints) +} + +func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) { + proxier.loadBalancer.OnEndpointsUpdate(oldEndpoints, endpoints) +} + +func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) { + proxier.loadBalancer.OnEndpointsDelete(endpoints) +} + +func (proxier *Proxier) OnEndpointsSynced() { + proxier.loadBalancer.OnEndpointsSynced() +} + func sameConfig(info *ServiceInfo, service *v1.Service, port *v1.ServicePort) bool { if info.protocol != port.Protocol || info.portal.port != int(port.Port) || info.nodePort != int(port.NodePort) { return false diff --git a/pkg/proxy/winuserspace/BUILD b/pkg/proxy/winuserspace/BUILD index 3925dc96d72..8bad75c386b 100644 --- a/pkg/proxy/winuserspace/BUILD +++ b/pkg/proxy/winuserspace/BUILD @@ -19,6 +19,7 @@ go_library( deps = [ "//pkg/apis/core/v1/helper:go_default_library", "//pkg/proxy:go_default_library", + "//pkg/proxy/config:go_default_library", "//pkg/util/ipconfig:go_default_library", "//pkg/util/netsh:go_default_library", "//pkg/util/slice:go_default_library", diff --git a/pkg/proxy/winuserspace/loadbalancer.go b/pkg/proxy/winuserspace/loadbalancer.go index e7cb770e930..d96a2951f48 100644 --- a/pkg/proxy/winuserspace/loadbalancer.go +++ b/pkg/proxy/winuserspace/loadbalancer.go @@ -19,6 +19,7 @@ package winuserspace import ( "k8s.io/api/core/v1" "k8s.io/kubernetes/pkg/proxy" + proxyconfig "k8s.io/kubernetes/pkg/proxy/config" "net" ) @@ -30,4 +31,6 @@ type LoadBalancer interface { NewService(service proxy.ServicePortName, sessionAffinityType v1.ServiceAffinity, stickyMaxAgeMinutes int) error DeleteService(service proxy.ServicePortName) CleanupStaleStickySessions(service proxy.ServicePortName) + + proxyconfig.EndpointsHandler } diff --git a/pkg/proxy/winuserspace/proxier.go b/pkg/proxy/winuserspace/proxier.go index 4b5a218cfc0..91cbd6337c8 100644 --- a/pkg/proxy/winuserspace/proxier.go +++ b/pkg/proxy/winuserspace/proxier.go @@ -444,6 +444,22 @@ func (proxier *Proxier) OnServiceDelete(service *v1.Service) { func (proxier *Proxier) OnServiceSynced() { } +func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) { + proxier.loadBalancer.OnEndpointsAdd(endpoints) +} + +func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) { + proxier.loadBalancer.OnEndpointsUpdate(oldEndpoints, endpoints) +} + +func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) { + proxier.loadBalancer.OnEndpointsDelete(endpoints) +} + +func (proxier *Proxier) OnEndpointsSynced() { + proxier.loadBalancer.OnEndpointsSynced() +} + func sameConfig(info *serviceInfo, service *v1.Service, protocol v1.Protocol, listenPort int) bool { return info.protocol == protocol && info.portal.port == listenPort && info.sessionAffinityType == service.Spec.SessionAffinity }