mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
Event-based iptables proxy for services
This commit is contained in:
parent
e22476fd42
commit
7a647f9d1a
@ -219,7 +219,10 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
|
||||
recorder := eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "kube-proxy", Host: hostname})
|
||||
|
||||
var proxier proxy.ProxyProvider
|
||||
var servicesHandler proxyconfig.ServiceConfigHandler
|
||||
var serviceEventHandler proxyconfig.ServiceHandler
|
||||
// TODO: Migrate all handlers to ServiceHandler types and
|
||||
// get rid of this one.
|
||||
var serviceHandler proxyconfig.ServiceConfigHandler
|
||||
var endpointsEventHandler proxyconfig.EndpointsHandler
|
||||
|
||||
proxyMode := getProxyMode(string(config.Mode), client.Core().Nodes(), hostname, iptInterface, iptables.LinuxKernelCompatTester{})
|
||||
@ -246,7 +249,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
|
||||
glog.Fatalf("Unable to create proxier: %v", err)
|
||||
}
|
||||
proxier = proxierIPTables
|
||||
servicesHandler = proxierIPTables
|
||||
serviceEventHandler = proxierIPTables
|
||||
endpointsEventHandler = proxierIPTables
|
||||
// No turning back. Remove artifacts that might still exist from the userspace Proxier.
|
||||
glog.V(0).Info("Tearing down userspace rules.")
|
||||
@ -271,7 +274,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
|
||||
if err != nil {
|
||||
glog.Fatalf("Unable to create proxier: %v", err)
|
||||
}
|
||||
servicesHandler = proxierUserspace
|
||||
serviceHandler = proxierUserspace
|
||||
proxier = proxierUserspace
|
||||
} else {
|
||||
// This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for
|
||||
@ -292,7 +295,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
|
||||
if err != nil {
|
||||
glog.Fatalf("Unable to create proxier: %v", err)
|
||||
}
|
||||
servicesHandler = proxierUserspace
|
||||
serviceHandler = proxierUserspace
|
||||
proxier = proxierUserspace
|
||||
}
|
||||
// Remove artifacts from the pure-iptables Proxier, if not on Windows.
|
||||
@ -314,7 +317,12 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
|
||||
// only notify on changes, and the initial update (on process start) may be lost if no handlers
|
||||
// are registered yet.
|
||||
serviceConfig := proxyconfig.NewServiceConfig(informerFactory.Core().InternalVersion().Services(), config.ConfigSyncPeriod)
|
||||
serviceConfig.RegisterHandler(servicesHandler)
|
||||
if serviceHandler != nil {
|
||||
serviceConfig.RegisterHandler(serviceHandler)
|
||||
}
|
||||
if serviceEventHandler != nil {
|
||||
serviceConfig.RegisterEventHandler(serviceEventHandler)
|
||||
}
|
||||
go serviceConfig.Run(wait.NeverStop)
|
||||
|
||||
endpointsConfig := proxyconfig.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), config.ConfigSyncPeriod)
|
||||
|
@ -20,7 +20,6 @@ go_library(
|
||||
"//pkg/client/informers/informers_generated/internalversion/core/internalversion:go_default_library",
|
||||
"//pkg/client/listers/core/internalversion:go_default_library",
|
||||
"//pkg/controller:go_default_library",
|
||||
"//pkg/util/config:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||
|
@ -28,7 +28,6 @@ import (
|
||||
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion/core/internalversion"
|
||||
listers "k8s.io/kubernetes/pkg/client/listers/core/internalversion"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/util/config"
|
||||
)
|
||||
|
||||
// ServiceConfigHandler is an abstract interface of objects which receive update notifications for the set of services.
|
||||
@ -47,7 +46,7 @@ type ServiceConfigHandler interface {
|
||||
OnServiceUpdate(services []*api.Service)
|
||||
}
|
||||
|
||||
// ServiceHandler is an abstract interface of objects whic receive
|
||||
// ServiceHandler is an abstract interface of objects which receive
|
||||
// notifications about service object changes.
|
||||
type ServiceHandler interface {
|
||||
// OnServiceAdd is called whenever creation of new service object
|
||||
|
@ -196,6 +196,7 @@ func newServiceInfo(serviceName proxy.ServicePortName, port *api.ServicePort, se
|
||||
}
|
||||
|
||||
type endpointsMap map[types.NamespacedName]*api.Endpoints
|
||||
type serviceMap map[types.NamespacedName]*api.Service
|
||||
type proxyServiceMap map[proxy.ServicePortName]*serviceInfo
|
||||
type proxyEndpointMap map[proxy.ServicePortName][]*endpointsInfo
|
||||
|
||||
@ -211,13 +212,13 @@ type Proxier struct {
|
||||
// to not be modified in the meantime, but also require to be not modified
|
||||
// by Proxier.
|
||||
allEndpoints endpointsMap
|
||||
// allServices is nil until we have seen an OnServiceUpdate event.
|
||||
allServices []*api.Service
|
||||
allServices serviceMap
|
||||
|
||||
// endpointsSynced is set to true when endpoints are synced after startup.
|
||||
// This is used to avoid updating iptables with some partial data after
|
||||
// kube-proxy restart.
|
||||
// endpointsSynced and servicesSynced are set to true when corresponding
|
||||
// objects are synced after startup. This is used to avoid updating iptables
|
||||
// with some partial data after kube-proxy restart.
|
||||
endpointsSynced bool
|
||||
servicesSynced bool
|
||||
|
||||
throttle flowcontrol.RateLimiter
|
||||
|
||||
@ -333,6 +334,7 @@ func NewProxier(ipt utiliptables.Interface,
|
||||
endpointsMap: make(proxyEndpointMap),
|
||||
portsMap: make(map[localPort]closeable),
|
||||
allEndpoints: make(endpointsMap),
|
||||
allServices: make(serviceMap),
|
||||
syncPeriod: syncPeriod,
|
||||
minSyncPeriod: minSyncPeriod,
|
||||
throttle: throttle,
|
||||
@ -457,7 +459,7 @@ func (proxier *Proxier) SyncLoop() {
|
||||
// Accepts a list of Services and the existing service map. Returns the new
|
||||
// service map, a map of healthcheck ports, and a set of stale UDP
|
||||
// services.
|
||||
func buildNewServiceMap(allServices []*api.Service, oldServiceMap proxyServiceMap) (proxyServiceMap, map[types.NamespacedName]uint16, sets.String) {
|
||||
func buildNewServiceMap(allServices serviceMap, oldServiceMap proxyServiceMap) (proxyServiceMap, map[types.NamespacedName]uint16, sets.String) {
|
||||
newServiceMap := make(proxyServiceMap)
|
||||
hcPorts := make(map[types.NamespacedName]uint16)
|
||||
|
||||
@ -525,15 +527,37 @@ func buildNewServiceMap(allServices []*api.Service, oldServiceMap proxyServiceMa
|
||||
return newServiceMap, hcPorts, staleUDPServices
|
||||
}
|
||||
|
||||
// OnServiceUpdate tracks the active set of service proxies.
|
||||
// They will be synchronized using syncProxyRules()
|
||||
func (proxier *Proxier) OnServiceUpdate(allServices []*api.Service) {
|
||||
func (proxier *Proxier) OnServiceAdd(service *api.Service) {
|
||||
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
||||
|
||||
proxier.mu.Lock()
|
||||
defer proxier.mu.Unlock()
|
||||
if proxier.allServices == nil {
|
||||
glog.V(2).Info("Received first Services update")
|
||||
}
|
||||
proxier.allServices = allServices
|
||||
proxier.allServices[namespacedName] = service
|
||||
proxier.syncProxyRules(syncReasonServices)
|
||||
}
|
||||
|
||||
func (proxier *Proxier) OnServiceUpdate(_, service *api.Service) {
|
||||
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
||||
|
||||
proxier.mu.Lock()
|
||||
defer proxier.mu.Unlock()
|
||||
proxier.allServices[namespacedName] = service
|
||||
proxier.syncProxyRules(syncReasonServices)
|
||||
}
|
||||
|
||||
func (proxier *Proxier) OnServiceDelete(service *api.Service) {
|
||||
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
||||
|
||||
proxier.mu.Lock()
|
||||
defer proxier.mu.Unlock()
|
||||
delete(proxier.allServices, namespacedName)
|
||||
proxier.syncProxyRules(syncReasonServices)
|
||||
}
|
||||
|
||||
func (proxier *Proxier) OnServiceSynced() {
|
||||
proxier.mu.Lock()
|
||||
defer proxier.mu.Unlock()
|
||||
proxier.servicesSynced = true
|
||||
proxier.syncProxyRules(syncReasonServices)
|
||||
}
|
||||
|
||||
|
@ -388,7 +388,7 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
|
||||
iptables: ipt,
|
||||
clusterCIDR: "10.0.0.0/24",
|
||||
allEndpoints: make(endpointsMap),
|
||||
allServices: []*api.Service{},
|
||||
allServices: make(serviceMap),
|
||||
endpointsSynced: true,
|
||||
hostname: testHostname,
|
||||
portsMap: make(map[localPort]closeable),
|
||||
@ -567,7 +567,7 @@ func TestClusterIPReject(t *testing.T) {
|
||||
Port: "p80",
|
||||
}
|
||||
|
||||
fp.allServices = []*api.Service{
|
||||
fp.allServices = makeServiceMap(
|
||||
makeTestService(svcPortName.Namespace, svcPortName.Namespace, func(svc *api.Service) {
|
||||
svc.Spec.ClusterIP = svcIP
|
||||
svc.Spec.Ports = []api.ServicePort{{
|
||||
@ -576,7 +576,7 @@ func TestClusterIPReject(t *testing.T) {
|
||||
Protocol: api.ProtocolTCP,
|
||||
}}
|
||||
}),
|
||||
}
|
||||
)
|
||||
fp.syncProxyRules(syncReasonForce)
|
||||
|
||||
svcChain := string(servicePortChainName(svcPortName.String(), strings.ToLower(string(api.ProtocolTCP))))
|
||||
@ -600,7 +600,7 @@ func TestClusterIPEndpointsJump(t *testing.T) {
|
||||
Port: "p80",
|
||||
}
|
||||
|
||||
fp.allServices = []*api.Service{
|
||||
fp.allServices = makeServiceMap(
|
||||
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) {
|
||||
svc.Spec.ClusterIP = svcIP
|
||||
svc.Spec.Ports = []api.ServicePort{{
|
||||
@ -609,7 +609,7 @@ func TestClusterIPEndpointsJump(t *testing.T) {
|
||||
Protocol: api.ProtocolTCP,
|
||||
}}
|
||||
}),
|
||||
}
|
||||
)
|
||||
|
||||
epIP := "10.180.0.1"
|
||||
fp.allEndpoints = makeEndpointsMap(
|
||||
@ -659,7 +659,7 @@ func TestLoadBalancer(t *testing.T) {
|
||||
Port: "p80",
|
||||
}
|
||||
|
||||
fp.allServices = []*api.Service{
|
||||
fp.allServices = makeServiceMap(
|
||||
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) {
|
||||
svc.Spec.Type = "LoadBalancer"
|
||||
svc.Spec.ClusterIP = svcIP
|
||||
@ -673,7 +673,7 @@ func TestLoadBalancer(t *testing.T) {
|
||||
IP: svcLBIP,
|
||||
}}
|
||||
}),
|
||||
}
|
||||
)
|
||||
|
||||
epIP := "10.180.0.1"
|
||||
fp.allEndpoints = makeEndpointsMap(
|
||||
@ -719,7 +719,7 @@ func TestNodePort(t *testing.T) {
|
||||
Port: "p80",
|
||||
}
|
||||
|
||||
fp.allServices = []*api.Service{
|
||||
fp.allServices = makeServiceMap(
|
||||
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) {
|
||||
svc.Spec.Type = "NodePort"
|
||||
svc.Spec.ClusterIP = svcIP
|
||||
@ -730,7 +730,7 @@ func TestNodePort(t *testing.T) {
|
||||
NodePort: int32(svcNodePort),
|
||||
}}
|
||||
}),
|
||||
}
|
||||
)
|
||||
|
||||
epIP := "10.180.0.1"
|
||||
fp.allEndpoints = makeEndpointsMap(
|
||||
@ -769,7 +769,7 @@ func TestNodePortReject(t *testing.T) {
|
||||
Port: "p80",
|
||||
}
|
||||
|
||||
fp.allServices = []*api.Service{
|
||||
fp.allServices = makeServiceMap(
|
||||
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) {
|
||||
svc.Spec.Type = "NodePort"
|
||||
svc.Spec.ClusterIP = svcIP
|
||||
@ -780,7 +780,7 @@ func TestNodePortReject(t *testing.T) {
|
||||
NodePort: int32(svcNodePort),
|
||||
}}
|
||||
}),
|
||||
}
|
||||
)
|
||||
|
||||
fp.syncProxyRules(syncReasonForce)
|
||||
|
||||
@ -806,7 +806,7 @@ func TestOnlyLocalLoadBalancing(t *testing.T) {
|
||||
Port: "p80",
|
||||
}
|
||||
|
||||
fp.allServices = []*api.Service{
|
||||
fp.allServices = makeServiceMap(
|
||||
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) {
|
||||
svc.Spec.Type = "LoadBalancer"
|
||||
svc.Spec.ClusterIP = svcIP
|
||||
@ -821,7 +821,7 @@ func TestOnlyLocalLoadBalancing(t *testing.T) {
|
||||
}}
|
||||
svc.Annotations[service.BetaAnnotationExternalTraffic] = service.AnnotationValueExternalTrafficLocal
|
||||
}),
|
||||
}
|
||||
)
|
||||
|
||||
epIP1 := "10.180.0.1"
|
||||
epIP2 := "10.180.2.1"
|
||||
@ -900,7 +900,7 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable
|
||||
Port: "p80",
|
||||
}
|
||||
|
||||
fp.allServices = []*api.Service{
|
||||
fp.allServices = makeServiceMap(
|
||||
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) {
|
||||
svc.Spec.Type = "NodePort"
|
||||
svc.Spec.ClusterIP = svcIP
|
||||
@ -912,7 +912,7 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable
|
||||
}}
|
||||
svc.Annotations[service.BetaAnnotationExternalTraffic] = service.AnnotationValueExternalTrafficLocal
|
||||
}),
|
||||
}
|
||||
)
|
||||
|
||||
epIP1 := "10.180.0.1"
|
||||
epIP2 := "10.180.2.1"
|
||||
@ -992,7 +992,7 @@ func addTestPort(array []api.ServicePort, name string, protocol api.Protocol, po
|
||||
}
|
||||
|
||||
func TestBuildServiceMapAddRemove(t *testing.T) {
|
||||
services := []*api.Service{
|
||||
services := makeServiceMap(
|
||||
makeTestService("somewhere-else", "cluster-ip", func(svc *api.Service) {
|
||||
svc.Spec.Type = api.ServiceTypeClusterIP
|
||||
svc.Spec.ClusterIP = "172.16.55.4"
|
||||
@ -1033,7 +1033,7 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
|
||||
},
|
||||
}
|
||||
}),
|
||||
}
|
||||
)
|
||||
|
||||
serviceMap, hcPorts, staleUDPServices := buildNewServiceMap(services, make(proxyServiceMap))
|
||||
if len(serviceMap) != 8 {
|
||||
@ -1056,8 +1056,9 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
|
||||
}
|
||||
|
||||
// Remove some stuff
|
||||
services = []*api.Service{services[0]}
|
||||
services[0].Spec.Ports = []api.ServicePort{services[0].Spec.Ports[1]}
|
||||
oneService := services[makeNSN("somewhere-else", "cluster-ip")]
|
||||
oneService.Spec.Ports = []api.ServicePort{oneService.Spec.Ports[1]}
|
||||
services = makeServiceMap(oneService)
|
||||
serviceMap, hcPorts, staleUDPServices = buildNewServiceMap(services, serviceMap)
|
||||
if len(serviceMap) != 1 {
|
||||
t.Errorf("expected service map length 1, got %v", serviceMap)
|
||||
@ -1082,13 +1083,13 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestBuildServiceMapServiceHeadless(t *testing.T) {
|
||||
services := []*api.Service{
|
||||
services := makeServiceMap(
|
||||
makeTestService("somewhere-else", "headless", func(svc *api.Service) {
|
||||
svc.Spec.Type = api.ServiceTypeClusterIP
|
||||
svc.Spec.ClusterIP = api.ClusterIPNone
|
||||
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "rpc", "UDP", 1234, 0, 0)
|
||||
}),
|
||||
}
|
||||
)
|
||||
|
||||
// Headless service should be ignored
|
||||
serviceMap, hcPorts, staleUDPServices := buildNewServiceMap(services, make(proxyServiceMap))
|
||||
@ -1107,14 +1108,14 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
|
||||
services := []*api.Service{
|
||||
services := makeServiceMap(
|
||||
makeTestService("somewhere-else", "external-name", func(svc *api.Service) {
|
||||
svc.Spec.Type = api.ServiceTypeExternalName
|
||||
svc.Spec.ClusterIP = "172.16.55.4" // Should be ignored
|
||||
svc.Spec.ExternalName = "foo2.bar.com"
|
||||
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "blah", "UDP", 1235, 5321, 0)
|
||||
}),
|
||||
}
|
||||
)
|
||||
|
||||
serviceMap, hcPorts, staleUDPServices := buildNewServiceMap(services, make(proxyServiceMap))
|
||||
if len(serviceMap) != 0 {
|
||||
@ -1130,16 +1131,16 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
||||
first := []*api.Service{
|
||||
first := makeServiceMap(
|
||||
makeTestService("somewhere", "some-service", func(svc *api.Service) {
|
||||
svc.Spec.Type = api.ServiceTypeClusterIP
|
||||
svc.Spec.ClusterIP = "172.16.55.4"
|
||||
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 0)
|
||||
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "TCP", 1235, 5321, 0)
|
||||
}),
|
||||
}
|
||||
)
|
||||
|
||||
second := []*api.Service{
|
||||
second := makeServiceMap(
|
||||
makeTestService("somewhere", "some-service", func(svc *api.Service) {
|
||||
svc.ObjectMeta.Annotations = map[string]string{
|
||||
service.BetaAnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal,
|
||||
@ -1156,7 +1157,7 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
||||
},
|
||||
}
|
||||
}),
|
||||
}
|
||||
)
|
||||
|
||||
serviceMap, hcPorts, staleUDPServices := buildNewServiceMap(first, make(proxyServiceMap))
|
||||
if len(serviceMap) != 2 {
|
||||
@ -1426,6 +1427,15 @@ func makeServicePortName(ns, name, port string) proxy.ServicePortName {
|
||||
}
|
||||
}
|
||||
|
||||
func makeServiceMap(allServices ...*api.Service) serviceMap {
|
||||
result := make(serviceMap)
|
||||
for _, service := range allServices {
|
||||
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
||||
result[namespacedName] = service
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func Test_buildNewEndpointsMap(t *testing.T) {
|
||||
var nodeName = "host"
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user