From b1475565e62f03dea7810e07752f07bc4f1ae0ab Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Fri, 7 Apr 2017 16:26:26 +0200 Subject: [PATCH] Edge-based iptables proxy --- cmd/kube-proxy/app/server.go | 2 +- pkg/proxy/iptables/proxier.go | 58 ++++++++++++---- pkg/proxy/iptables/proxier_test.go | 108 ++++++++++++++++------------- 3 files changed, 103 insertions(+), 65 deletions(-) diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index f4df9202643..f4615be7287 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -250,7 +250,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err } proxier = proxierIPTables servicesHandler = proxierIPTables - endpointsHandler = proxierIPTables + endpointsEventHandler = proxierIPTables // No turning back. Remove artifacts that might still exist from the userspace Proxier. glog.V(0).Info("Tearing down userspace rules.") userspace.CleanupLeftovers(iptInterface) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 9868c363c51..6d0b6387ff6 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -195,8 +195,8 @@ func newServiceInfo(serviceName proxy.ServicePortName, port *api.ServicePort, se return info } +type endpointsMap map[types.NamespacedName]*api.Endpoints type proxyServiceMap map[proxy.ServicePortName]*serviceInfo - type proxyEndpointMap map[proxy.ServicePortName][]*endpointsInfo // Proxier is an iptables based proxy for connections between a localhost:lport @@ -210,9 +210,14 @@ type Proxier struct { // pointers are shared with higher layers of kube-proxy. They are guaranteed // to not be modified in the meantime, but also require to be not modified // by Proxier. - // nil until we have seen an On*Update event. - allServices []*api.Service - allEndpoints []*api.Endpoints + allEndpoints endpointsMap + // allServices is nil until we have seen an OnServiceUpdate event. + allServices []*api.Service + + // endpointsSynced is set to true when endpoints are synced after startup. + // This is used to avoid updating iptables with some partial data after + // kube-proxy restart. + endpointsSynced bool throttle flowcontrol.RateLimiter @@ -327,6 +332,7 @@ func NewProxier(ipt utiliptables.Interface, serviceMap: make(proxyServiceMap), endpointsMap: make(proxyEndpointMap), portsMap: make(map[localPort]closeable), + allEndpoints: make(endpointsMap), syncPeriod: syncPeriod, minSyncPeriod: minSyncPeriod, throttle: throttle, @@ -531,19 +537,42 @@ func (proxier *Proxier) OnServiceUpdate(allServices []*api.Service) { proxier.syncProxyRules(syncReasonServices) } -// OnEndpointsUpdate takes in a slice of updated endpoints. -func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []*api.Endpoints) { +func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) { + namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} + proxier.mu.Lock() defer proxier.mu.Unlock() - if proxier.allEndpoints == nil { - glog.V(2).Info("Received first Endpoints update") - } - proxier.allEndpoints = allEndpoints + proxier.allEndpoints[namespacedName] = endpoints + proxier.syncProxyRules(syncReasonEndpoints) +} + +func (proxier *Proxier) OnEndpointsUpdate(_, endpoints *api.Endpoints) { + namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} + + proxier.mu.Lock() + defer proxier.mu.Unlock() + proxier.allEndpoints[namespacedName] = endpoints + proxier.syncProxyRules(syncReasonEndpoints) +} + +func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) { + namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} + + proxier.mu.Lock() + defer proxier.mu.Unlock() + delete(proxier.allEndpoints, namespacedName) + proxier.syncProxyRules(syncReasonEndpoints) +} + +func (proxier *Proxier) OnEndpointsSynced() { + proxier.mu.Lock() + defer proxier.mu.Unlock() + proxier.endpointsSynced = true proxier.syncProxyRules(syncReasonEndpoints) } // Convert a slice of api.Endpoints objects into a map of service-port -> endpoints. -func buildNewEndpointsMap(allEndpoints []*api.Endpoints, curMap proxyEndpointMap, hostname string) (newMap proxyEndpointMap, hcEndpoints map[types.NamespacedName]int, staleSet map[endpointServicePair]bool) { +func buildNewEndpointsMap(allEndpoints endpointsMap, curMap proxyEndpointMap, hostname string) (newMap proxyEndpointMap, hcEndpoints map[types.NamespacedName]int, staleSet map[endpointServicePair]bool) { // return values newMap = make(proxyEndpointMap) @@ -551,8 +580,8 @@ func buildNewEndpointsMap(allEndpoints []*api.Endpoints, curMap proxyEndpointMap staleSet = make(map[endpointServicePair]bool) // Update endpoints for services. - for i := range allEndpoints { - accumulateEndpointsMap(allEndpoints[i], hostname, &newMap) + for _, endpoints := range allEndpoints { + accumulateEndpointsMap(endpoints, hostname, &newMap) } // Check stale connections against endpoints missing from the update. // TODO: we should really only mark a connection stale if the proto was UDP @@ -607,7 +636,6 @@ func buildNewEndpointsMap(allEndpoints []*api.Endpoints, curMap proxyEndpointMap // NOTE: endpoints object should NOT be modified. // // TODO: this could be simplified: -// - hostPortInfo and endpointsInfo overlap too much // - the test for this is overlapped by the test for buildNewEndpointsMap // - naming is poor and responsibilities are muddled func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, newEndpoints *proxyEndpointMap) { @@ -732,7 +760,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { glog.V(4).Infof("syncProxyRules(%s) took %v", reason, time.Since(start)) }() // don't sync rules till we've received services and endpoints - if proxier.allEndpoints == nil || proxier.allServices == nil { + if !proxier.endpointsSynced || proxier.allServices == nil { glog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master") return } diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 9b40441ba71..42303009634 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -383,16 +383,17 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier { // TODO: Call NewProxier after refactoring out the goroutine // invocation into a Run() method. return &Proxier{ - exec: &exec.FakeExec{}, - serviceMap: make(map[proxy.ServicePortName]*serviceInfo), - iptables: ipt, - clusterCIDR: "10.0.0.0/24", - allEndpoints: []*api.Endpoints{}, - allServices: []*api.Service{}, - hostname: testHostname, - portsMap: make(map[localPort]closeable), - portMapper: &fakePortOpener{[]*localPort{}}, - healthChecker: newFakeHealthChecker(), + exec: &exec.FakeExec{}, + serviceMap: make(map[proxy.ServicePortName]*serviceInfo), + iptables: ipt, + clusterCIDR: "10.0.0.0/24", + allEndpoints: make(endpointsMap), + allServices: []*api.Service{}, + endpointsSynced: true, + hostname: testHostname, + portsMap: make(map[localPort]closeable), + portMapper: &fakePortOpener{[]*localPort{}}, + healthChecker: newFakeHealthChecker(), } } @@ -611,7 +612,7 @@ func TestClusterIPEndpointsJump(t *testing.T) { } epIP := "10.180.0.1" - fp.allEndpoints = []*api.Endpoints{ + fp.allEndpoints = makeEndpointsMap( makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -623,7 +624,7 @@ func TestClusterIPEndpointsJump(t *testing.T) { }}, }} }), - } + ) fp.syncProxyRules(syncReasonForce) @@ -675,7 +676,7 @@ func TestLoadBalancer(t *testing.T) { } epIP := "10.180.0.1" - fp.allEndpoints = []*api.Endpoints{ + fp.allEndpoints = makeEndpointsMap( makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -687,7 +688,7 @@ func TestLoadBalancer(t *testing.T) { }}, }} }), - } + ) fp.syncProxyRules(syncReasonForce) @@ -732,7 +733,7 @@ func TestNodePort(t *testing.T) { } epIP := "10.180.0.1" - fp.allEndpoints = []*api.Endpoints{ + fp.allEndpoints = makeEndpointsMap( makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -744,7 +745,7 @@ func TestNodePort(t *testing.T) { }}, }} }), - } + ) fp.syncProxyRules(syncReasonForce) @@ -826,7 +827,7 @@ func TestOnlyLocalLoadBalancing(t *testing.T) { epIP2 := "10.180.2.1" epStrLocal := fmt.Sprintf("%s:%d", epIP1, svcPort) epStrNonLocal := fmt.Sprintf("%s:%d", epIP2, svcPort) - fp.allEndpoints = []*api.Endpoints{ + fp.allEndpoints = makeEndpointsMap( makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -842,7 +843,7 @@ func TestOnlyLocalLoadBalancing(t *testing.T) { }}, }} }), - } + ) fp.syncProxyRules(syncReasonForce) @@ -917,7 +918,7 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable epIP2 := "10.180.2.1" epStrLocal := fmt.Sprintf("%s:%d", epIP1, svcPort) epStrNonLocal := fmt.Sprintf("%s:%d", epIP2, svcPort) - fp.allEndpoints = []*api.Endpoints{ + fp.allEndpoints = makeEndpointsMap( makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -933,7 +934,7 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable }}, }} }), - } + ) fp.syncProxyRules(syncReasonForce) @@ -1405,6 +1406,15 @@ func makeTestEndpoints(namespace, name string, eptFunc func(*api.Endpoints)) *ap return ept } +func makeEndpointsMap(allEndpoints ...*api.Endpoints) endpointsMap { + result := make(endpointsMap) + for _, endpoints := range allEndpoints { + namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} + result[namespacedName] = endpoints + } + return result +} + func makeNSN(namespace, name string) types.NamespacedName { return types.NamespacedName{Namespace: namespace, Name: name} } @@ -1420,21 +1430,21 @@ func Test_buildNewEndpointsMap(t *testing.T) { var nodeName = "host" testCases := []struct { - newEndpoints []*api.Endpoints + newEndpoints map[types.NamespacedName]*api.Endpoints oldEndpoints map[proxy.ServicePortName][]*endpointsInfo expectedResult map[proxy.ServicePortName][]*endpointsInfo expectedStale []endpointServicePair expectedHealthchecks map[types.NamespacedName]int }{{ // Case[0]: nothing - newEndpoints: []*api.Endpoints{}, + newEndpoints: map[types.NamespacedName]*api.Endpoints{}, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, expectedStale: []endpointServicePair{}, expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[1]: no change, unnamed port - newEndpoints: []*api.Endpoints{ + newEndpoints: makeEndpointsMap( makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1445,7 +1455,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { }}, }} }), - }, + ), oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", ""): { {"1.1.1.1:11", false}, @@ -1460,7 +1470,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[2]: no change, named port, local - newEndpoints: []*api.Endpoints{ + newEndpoints: makeEndpointsMap( makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1473,7 +1483,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { }}, }} }), - }, + ), oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { {"1.1.1.1:11", true}, @@ -1490,7 +1500,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { }, }, { // Case[3]: no change, multiple subsets - newEndpoints: []*api.Endpoints{ + newEndpoints: makeEndpointsMap( makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1510,7 +1520,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { }}, }} }), - }, + ), oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { {"1.1.1.1:11", false}, @@ -1531,7 +1541,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[4]: no change, multiple subsets, multiple ports, local - newEndpoints: []*api.Endpoints{ + newEndpoints: makeEndpointsMap( makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1555,7 +1565,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { }}, }} }), - }, + ), oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { {"1.1.1.1:11", true}, @@ -1584,7 +1594,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { }, }, { // Case[5]: no change, multiple endpoints, subsets, IPs, and ports - newEndpoints: []*api.Endpoints{ + newEndpoints: makeEndpointsMap( makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1633,7 +1643,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { }}, }} }), - }, + ), oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { {"1.1.1.1:11", false}, @@ -1693,7 +1703,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { }, }, { // Case[6]: add an Endpoints - newEndpoints: []*api.Endpoints{ + newEndpoints: makeEndpointsMap( makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1705,7 +1715,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { }}, }} }), - }, + ), oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ /* empty */ }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", ""): { @@ -1718,7 +1728,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { }, }, { // Case[7]: remove an Endpoints - newEndpoints: []*api.Endpoints{ /* empty */ }, + newEndpoints: makeEndpointsMap(), oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", ""): { {"1.1.1.1:11", true}, @@ -1732,7 +1742,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[8]: add an IP and port - newEndpoints: []*api.Endpoints{ + newEndpoints: makeEndpointsMap( makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1750,7 +1760,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { }}, }} }), - }, + ), oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { {"1.1.1.1:11", false}, @@ -1772,7 +1782,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { }, }, { // Case[9]: remove an IP and port - newEndpoints: []*api.Endpoints{ + newEndpoints: makeEndpointsMap( makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1784,7 +1794,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { }}, }} }), - }, + ), oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { {"1.1.1.1:11", false}, @@ -1813,7 +1823,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[10]: add a subset - newEndpoints: []*api.Endpoints{ + newEndpoints: makeEndpointsMap( makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1834,7 +1844,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { }}, }} }), - }, + ), oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { {"1.1.1.1:11", false}, @@ -1854,7 +1864,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { }, }, { // Case[11]: remove a subset - newEndpoints: []*api.Endpoints{ + newEndpoints: makeEndpointsMap( makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1866,7 +1876,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { }}, }} }), - }, + ), oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { {"1.1.1.1:11", false}, @@ -1887,7 +1897,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[12]: rename a port - newEndpoints: []*api.Endpoints{ + newEndpoints: makeEndpointsMap( makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1899,7 +1909,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { }}, }} }), - }, + ), oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { {"1.1.1.1:11", false}, @@ -1917,7 +1927,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[13]: renumber a port - newEndpoints: []*api.Endpoints{ + newEndpoints: makeEndpointsMap( makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1929,7 +1939,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { }}, }} }), - }, + ), oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { {"1.1.1.1:11", false}, @@ -1947,7 +1957,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[14]: complex add and remove - newEndpoints: []*api.Endpoints{ + newEndpoints: makeEndpointsMap( makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1995,7 +2005,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { }}, }} }), - }, + ), oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { {"1.1.1.1:11", false},