Edge-based iptables proxy

This commit is contained in:
Wojciech Tyczynski 2017-04-07 16:26:26 +02:00
parent c5cbdbe3d2
commit b1475565e6
3 changed files with 103 additions and 65 deletions

View File

@ -250,7 +250,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
}
proxier = proxierIPTables
servicesHandler = proxierIPTables
endpointsHandler = proxierIPTables
endpointsEventHandler = proxierIPTables
// No turning back. Remove artifacts that might still exist from the userspace Proxier.
glog.V(0).Info("Tearing down userspace rules.")
userspace.CleanupLeftovers(iptInterface)

View File

@ -195,8 +195,8 @@ func newServiceInfo(serviceName proxy.ServicePortName, port *api.ServicePort, se
return info
}
type endpointsMap map[types.NamespacedName]*api.Endpoints
type proxyServiceMap map[proxy.ServicePortName]*serviceInfo
type proxyEndpointMap map[proxy.ServicePortName][]*endpointsInfo
// Proxier is an iptables based proxy for connections between a localhost:lport
@ -210,9 +210,14 @@ type Proxier struct {
// pointers are shared with higher layers of kube-proxy. They are guaranteed
// to not be modified in the meantime, but also require to be not modified
// by Proxier.
// nil until we have seen an On*Update event.
allEndpoints endpointsMap
// allServices is nil until we have seen an OnServiceUpdate event.
allServices []*api.Service
allEndpoints []*api.Endpoints
// endpointsSynced is set to true when endpoints are synced after startup.
// This is used to avoid updating iptables with some partial data after
// kube-proxy restart.
endpointsSynced bool
throttle flowcontrol.RateLimiter
@ -327,6 +332,7 @@ func NewProxier(ipt utiliptables.Interface,
serviceMap: make(proxyServiceMap),
endpointsMap: make(proxyEndpointMap),
portsMap: make(map[localPort]closeable),
allEndpoints: make(endpointsMap),
syncPeriod: syncPeriod,
minSyncPeriod: minSyncPeriod,
throttle: throttle,
@ -531,19 +537,42 @@ func (proxier *Proxier) OnServiceUpdate(allServices []*api.Service) {
proxier.syncProxyRules(syncReasonServices)
}
// OnEndpointsUpdate takes in a slice of updated endpoints.
func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []*api.Endpoints) {
func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
proxier.mu.Lock()
defer proxier.mu.Unlock()
if proxier.allEndpoints == nil {
glog.V(2).Info("Received first Endpoints update")
proxier.allEndpoints[namespacedName] = endpoints
proxier.syncProxyRules(syncReasonEndpoints)
}
proxier.allEndpoints = allEndpoints
func (proxier *Proxier) OnEndpointsUpdate(_, endpoints *api.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
proxier.mu.Lock()
defer proxier.mu.Unlock()
proxier.allEndpoints[namespacedName] = endpoints
proxier.syncProxyRules(syncReasonEndpoints)
}
func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
proxier.mu.Lock()
defer proxier.mu.Unlock()
delete(proxier.allEndpoints, namespacedName)
proxier.syncProxyRules(syncReasonEndpoints)
}
func (proxier *Proxier) OnEndpointsSynced() {
proxier.mu.Lock()
defer proxier.mu.Unlock()
proxier.endpointsSynced = true
proxier.syncProxyRules(syncReasonEndpoints)
}
// Convert a slice of api.Endpoints objects into a map of service-port -> endpoints.
func buildNewEndpointsMap(allEndpoints []*api.Endpoints, curMap proxyEndpointMap, hostname string) (newMap proxyEndpointMap, hcEndpoints map[types.NamespacedName]int, staleSet map[endpointServicePair]bool) {
func buildNewEndpointsMap(allEndpoints endpointsMap, curMap proxyEndpointMap, hostname string) (newMap proxyEndpointMap, hcEndpoints map[types.NamespacedName]int, staleSet map[endpointServicePair]bool) {
// return values
newMap = make(proxyEndpointMap)
@ -551,8 +580,8 @@ func buildNewEndpointsMap(allEndpoints []*api.Endpoints, curMap proxyEndpointMap
staleSet = make(map[endpointServicePair]bool)
// Update endpoints for services.
for i := range allEndpoints {
accumulateEndpointsMap(allEndpoints[i], hostname, &newMap)
for _, endpoints := range allEndpoints {
accumulateEndpointsMap(endpoints, hostname, &newMap)
}
// Check stale connections against endpoints missing from the update.
// TODO: we should really only mark a connection stale if the proto was UDP
@ -607,7 +636,6 @@ func buildNewEndpointsMap(allEndpoints []*api.Endpoints, curMap proxyEndpointMap
// NOTE: endpoints object should NOT be modified.
//
// TODO: this could be simplified:
// - hostPortInfo and endpointsInfo overlap too much
// - the test for this is overlapped by the test for buildNewEndpointsMap
// - naming is poor and responsibilities are muddled
func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, newEndpoints *proxyEndpointMap) {
@ -732,7 +760,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
glog.V(4).Infof("syncProxyRules(%s) took %v", reason, time.Since(start))
}()
// don't sync rules till we've received services and endpoints
if proxier.allEndpoints == nil || proxier.allServices == nil {
if !proxier.endpointsSynced || proxier.allServices == nil {
glog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master")
return
}

View File

@ -387,8 +387,9 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
serviceMap: make(map[proxy.ServicePortName]*serviceInfo),
iptables: ipt,
clusterCIDR: "10.0.0.0/24",
allEndpoints: []*api.Endpoints{},
allEndpoints: make(endpointsMap),
allServices: []*api.Service{},
endpointsSynced: true,
hostname: testHostname,
portsMap: make(map[localPort]closeable),
portMapper: &fakePortOpener{[]*localPort{}},
@ -611,7 +612,7 @@ func TestClusterIPEndpointsJump(t *testing.T) {
}
epIP := "10.180.0.1"
fp.allEndpoints = []*api.Endpoints{
fp.allEndpoints = makeEndpointsMap(
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
@ -623,7 +624,7 @@ func TestClusterIPEndpointsJump(t *testing.T) {
}},
}}
}),
}
)
fp.syncProxyRules(syncReasonForce)
@ -675,7 +676,7 @@ func TestLoadBalancer(t *testing.T) {
}
epIP := "10.180.0.1"
fp.allEndpoints = []*api.Endpoints{
fp.allEndpoints = makeEndpointsMap(
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
@ -687,7 +688,7 @@ func TestLoadBalancer(t *testing.T) {
}},
}}
}),
}
)
fp.syncProxyRules(syncReasonForce)
@ -732,7 +733,7 @@ func TestNodePort(t *testing.T) {
}
epIP := "10.180.0.1"
fp.allEndpoints = []*api.Endpoints{
fp.allEndpoints = makeEndpointsMap(
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
@ -744,7 +745,7 @@ func TestNodePort(t *testing.T) {
}},
}}
}),
}
)
fp.syncProxyRules(syncReasonForce)
@ -826,7 +827,7 @@ func TestOnlyLocalLoadBalancing(t *testing.T) {
epIP2 := "10.180.2.1"
epStrLocal := fmt.Sprintf("%s:%d", epIP1, svcPort)
epStrNonLocal := fmt.Sprintf("%s:%d", epIP2, svcPort)
fp.allEndpoints = []*api.Endpoints{
fp.allEndpoints = makeEndpointsMap(
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
@ -842,7 +843,7 @@ func TestOnlyLocalLoadBalancing(t *testing.T) {
}},
}}
}),
}
)
fp.syncProxyRules(syncReasonForce)
@ -917,7 +918,7 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable
epIP2 := "10.180.2.1"
epStrLocal := fmt.Sprintf("%s:%d", epIP1, svcPort)
epStrNonLocal := fmt.Sprintf("%s:%d", epIP2, svcPort)
fp.allEndpoints = []*api.Endpoints{
fp.allEndpoints = makeEndpointsMap(
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
@ -933,7 +934,7 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable
}},
}}
}),
}
)
fp.syncProxyRules(syncReasonForce)
@ -1405,6 +1406,15 @@ func makeTestEndpoints(namespace, name string, eptFunc func(*api.Endpoints)) *ap
return ept
}
func makeEndpointsMap(allEndpoints ...*api.Endpoints) endpointsMap {
result := make(endpointsMap)
for _, endpoints := range allEndpoints {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
result[namespacedName] = endpoints
}
return result
}
func makeNSN(namespace, name string) types.NamespacedName {
return types.NamespacedName{Namespace: namespace, Name: name}
}
@ -1420,21 +1430,21 @@ func Test_buildNewEndpointsMap(t *testing.T) {
var nodeName = "host"
testCases := []struct {
newEndpoints []*api.Endpoints
newEndpoints map[types.NamespacedName]*api.Endpoints
oldEndpoints map[proxy.ServicePortName][]*endpointsInfo
expectedResult map[proxy.ServicePortName][]*endpointsInfo
expectedStale []endpointServicePair
expectedHealthchecks map[types.NamespacedName]int
}{{
// Case[0]: nothing
newEndpoints: []*api.Endpoints{},
newEndpoints: map[types.NamespacedName]*api.Endpoints{},
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{},
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{},
expectedStale: []endpointServicePair{},
expectedHealthchecks: map[types.NamespacedName]int{},
}, {
// Case[1]: no change, unnamed port
newEndpoints: []*api.Endpoints{
newEndpoints: makeEndpointsMap(
makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
@ -1445,7 +1455,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
}},
}}
}),
},
),
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", ""): {
{"1.1.1.1:11", false},
@ -1460,7 +1470,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
expectedHealthchecks: map[types.NamespacedName]int{},
}, {
// Case[2]: no change, named port, local
newEndpoints: []*api.Endpoints{
newEndpoints: makeEndpointsMap(
makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
@ -1473,7 +1483,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
}},
}}
}),
},
),
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", true},
@ -1490,7 +1500,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
},
}, {
// Case[3]: no change, multiple subsets
newEndpoints: []*api.Endpoints{
newEndpoints: makeEndpointsMap(
makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
@ -1510,7 +1520,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
}},
}}
}),
},
),
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", false},
@ -1531,7 +1541,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
expectedHealthchecks: map[types.NamespacedName]int{},
}, {
// Case[4]: no change, multiple subsets, multiple ports, local
newEndpoints: []*api.Endpoints{
newEndpoints: makeEndpointsMap(
makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
@ -1555,7 +1565,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
}},
}}
}),
},
),
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", true},
@ -1584,7 +1594,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
},
}, {
// Case[5]: no change, multiple endpoints, subsets, IPs, and ports
newEndpoints: []*api.Endpoints{
newEndpoints: makeEndpointsMap(
makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
@ -1633,7 +1643,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
}},
}}
}),
},
),
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", false},
@ -1693,7 +1703,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
},
}, {
// Case[6]: add an Endpoints
newEndpoints: []*api.Endpoints{
newEndpoints: makeEndpointsMap(
makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
@ -1705,7 +1715,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
}},
}}
}),
},
),
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ /* empty */ },
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", ""): {
@ -1718,7 +1728,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
},
}, {
// Case[7]: remove an Endpoints
newEndpoints: []*api.Endpoints{ /* empty */ },
newEndpoints: makeEndpointsMap(),
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", ""): {
{"1.1.1.1:11", true},
@ -1732,7 +1742,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
expectedHealthchecks: map[types.NamespacedName]int{},
}, {
// Case[8]: add an IP and port
newEndpoints: []*api.Endpoints{
newEndpoints: makeEndpointsMap(
makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
@ -1750,7 +1760,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
}},
}}
}),
},
),
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", false},
@ -1772,7 +1782,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
},
}, {
// Case[9]: remove an IP and port
newEndpoints: []*api.Endpoints{
newEndpoints: makeEndpointsMap(
makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
@ -1784,7 +1794,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
}},
}}
}),
},
),
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", false},
@ -1813,7 +1823,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
expectedHealthchecks: map[types.NamespacedName]int{},
}, {
// Case[10]: add a subset
newEndpoints: []*api.Endpoints{
newEndpoints: makeEndpointsMap(
makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
@ -1834,7 +1844,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
}},
}}
}),
},
),
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", false},
@ -1854,7 +1864,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
},
}, {
// Case[11]: remove a subset
newEndpoints: []*api.Endpoints{
newEndpoints: makeEndpointsMap(
makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
@ -1866,7 +1876,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
}},
}}
}),
},
),
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", false},
@ -1887,7 +1897,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
expectedHealthchecks: map[types.NamespacedName]int{},
}, {
// Case[12]: rename a port
newEndpoints: []*api.Endpoints{
newEndpoints: makeEndpointsMap(
makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
@ -1899,7 +1909,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
}},
}}
}),
},
),
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", false},
@ -1917,7 +1927,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
expectedHealthchecks: map[types.NamespacedName]int{},
}, {
// Case[13]: renumber a port
newEndpoints: []*api.Endpoints{
newEndpoints: makeEndpointsMap(
makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
@ -1929,7 +1939,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
}},
}}
}),
},
),
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", false},
@ -1947,7 +1957,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
expectedHealthchecks: map[types.NamespacedName]int{},
}, {
// Case[14]: complex add and remove
newEndpoints: []*api.Endpoints{
newEndpoints: makeEndpointsMap(
makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
@ -1995,7 +2005,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
}},
}}
}),
},
),
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", false},