mirror of
https://github.com/k3s-io/kubernetes.git
synced 2026-01-06 07:57:35 +00:00
Avoid copying endpoints object in kube-proxy
This commit is contained in:
@@ -197,9 +197,14 @@ type Proxier struct {
|
||||
serviceMap proxyServiceMap
|
||||
endpointsMap proxyEndpointMap
|
||||
portsMap map[localPort]closeable
|
||||
haveReceivedServiceUpdate bool // true once we've seen an OnServiceUpdate event
|
||||
allEndpoints []api.Endpoints // nil until we have seen an OnEndpointsUpdate event
|
||||
throttle flowcontrol.RateLimiter
|
||||
haveReceivedServiceUpdate bool // true once we've seen an OnServiceUpdate event
|
||||
// allEndpoints should never be modified by proxier - the pointers
|
||||
// are shared with higher layers of kube-proxy. They are guaranteed
|
||||
// to not be modified in the meantime, but also require to be not
|
||||
// modified by Proxier.
|
||||
// nil until we have seen an OnEndpointsUpdate event.
|
||||
allEndpoints []*api.Endpoints
|
||||
throttle flowcontrol.RateLimiter
|
||||
|
||||
// These are effectively const and do not need the mutex to be held.
|
||||
syncPeriod time.Duration
|
||||
@@ -559,7 +564,7 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
|
||||
}
|
||||
|
||||
// OnEndpointsUpdate takes in a slice of updated endpoints.
|
||||
func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
|
||||
func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []*api.Endpoints) {
|
||||
proxier.mu.Lock()
|
||||
defer proxier.mu.Unlock()
|
||||
if proxier.allEndpoints == nil {
|
||||
@@ -580,7 +585,7 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
|
||||
}
|
||||
|
||||
// Convert a slice of api.Endpoints objects into a map of service-port -> endpoints.
|
||||
func updateEndpoints(allEndpoints []api.Endpoints, curMap proxyEndpointMap, hostname string,
|
||||
func updateEndpoints(allEndpoints []*api.Endpoints, curMap proxyEndpointMap, hostname string,
|
||||
healthChecker healthChecker) (newMap proxyEndpointMap, staleSet map[endpointServicePair]bool) {
|
||||
|
||||
// return values
|
||||
@@ -589,7 +594,7 @@ func updateEndpoints(allEndpoints []api.Endpoints, curMap proxyEndpointMap, host
|
||||
|
||||
// Update endpoints for services.
|
||||
for i := range allEndpoints {
|
||||
accumulateEndpointsMap(&allEndpoints[i], hostname, curMap, &newMap)
|
||||
accumulateEndpointsMap(allEndpoints[i], hostname, curMap, &newMap)
|
||||
}
|
||||
// Check stale connections against endpoints missing from the update.
|
||||
// TODO: we should really only mark a connection stale if the proto was UDP
|
||||
@@ -630,6 +635,8 @@ func updateEndpoints(allEndpoints []api.Endpoints, curMap proxyEndpointMap, host
|
||||
// scope - it only knows one Endpoints, but sees the whole current map. That
|
||||
// cleanup has to be done above.
|
||||
//
|
||||
// NOTE: endpoints object should NOT be modified.
|
||||
//
|
||||
// TODO: this could be simplified:
|
||||
// - hostPortInfo and endpointsInfo overlap too much
|
||||
// - the test for this is overlapped by the test for updateEndpoints
|
||||
|
||||
@@ -369,7 +369,7 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
|
||||
serviceMap: make(map[proxy.ServicePortName]*serviceInfo),
|
||||
iptables: ipt,
|
||||
clusterCIDR: "10.0.0.0/24",
|
||||
allEndpoints: []api.Endpoints{},
|
||||
allEndpoints: []*api.Endpoints{},
|
||||
haveReceivedServiceUpdate: true,
|
||||
hostname: testHostname,
|
||||
portsMap: make(map[localPort]closeable),
|
||||
@@ -570,7 +570,7 @@ func TestClusterIPEndpointsJump(t *testing.T) {
|
||||
ip := "10.180.0.1"
|
||||
port := 80
|
||||
ep := fmt.Sprintf("%s:%d", ip, port)
|
||||
allEndpoints := []api.Endpoints{
|
||||
allEndpoints := []*api.Endpoints{
|
||||
makeTestEndpoints("ns1", svcName, func(ept *api.Endpoints) {
|
||||
ept.Subsets = []api.EndpointSubset{{
|
||||
Addresses: []api.EndpointAddress{{
|
||||
@@ -624,7 +624,7 @@ func TestLoadBalancer(t *testing.T) {
|
||||
|
||||
ip := "10.180.0.1"
|
||||
port := 80
|
||||
fp.allEndpoints = []api.Endpoints{
|
||||
fp.allEndpoints = []*api.Endpoints{
|
||||
makeTestEndpoints("ns1", svcName, func(ept *api.Endpoints) {
|
||||
ept.Subsets = []api.EndpointSubset{{
|
||||
Addresses: []api.EndpointAddress{{
|
||||
@@ -669,7 +669,7 @@ func TestNodePort(t *testing.T) {
|
||||
|
||||
ip := "10.180.0.1"
|
||||
port := 80
|
||||
fp.allEndpoints = []api.Endpoints{
|
||||
fp.allEndpoints = []*api.Endpoints{
|
||||
makeTestEndpoints("ns1", svcName, func(ept *api.Endpoints) {
|
||||
ept.Subsets = []api.EndpointSubset{{
|
||||
Addresses: []api.EndpointAddress{{
|
||||
@@ -732,7 +732,7 @@ func TestOnlyLocalLoadBalancing(t *testing.T) {
|
||||
port := 80
|
||||
nonLocalEp := fmt.Sprintf("%s:%d", ip1, port)
|
||||
localEp := fmt.Sprintf("%s:%d", ip2, port)
|
||||
allEndpoints := []api.Endpoints{
|
||||
allEndpoints := []*api.Endpoints{
|
||||
makeTestEndpoints("ns1", svcName, func(ept *api.Endpoints) {
|
||||
ept.Subsets = []api.EndpointSubset{{
|
||||
Addresses: []api.EndpointAddress{{
|
||||
@@ -810,7 +810,7 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable
|
||||
port := 80
|
||||
nonLocalEp := fmt.Sprintf("%s:%d", ip1, port)
|
||||
localEp := fmt.Sprintf("%s:%d", ip2, port)
|
||||
allEndpoints := []api.Endpoints{
|
||||
allEndpoints := []*api.Endpoints{
|
||||
makeTestEndpoints("ns1", svcName, func(ept *api.Endpoints) {
|
||||
ept.Subsets = []api.EndpointSubset{{
|
||||
Addresses: []api.EndpointAddress{{
|
||||
@@ -1147,7 +1147,7 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
||||
// This is a coarse test, but it offers some modicum of confidence as the code is evolved.
|
||||
func Test_accumulateEndpointsMap(t *testing.T) {
|
||||
testCases := []struct {
|
||||
newEndpoints api.Endpoints
|
||||
newEndpoints *api.Endpoints
|
||||
oldEndpoints map[proxy.ServicePortName][]*endpointsInfo
|
||||
expectedNew map[proxy.ServicePortName][]*endpointsInfo
|
||||
}{{
|
||||
@@ -1356,7 +1356,7 @@ func Test_accumulateEndpointsMap(t *testing.T) {
|
||||
for tci, tc := range testCases {
|
||||
// outputs
|
||||
newEndpoints := make(proxyEndpointMap)
|
||||
accumulateEndpointsMap(&tc.newEndpoints, "host", tc.oldEndpoints, &newEndpoints)
|
||||
accumulateEndpointsMap(tc.newEndpoints, "host", tc.oldEndpoints, &newEndpoints)
|
||||
|
||||
if len(newEndpoints) != len(tc.expectedNew) {
|
||||
t.Errorf("[%d] expected %d new, got %d: %v", tci, len(tc.expectedNew), len(newEndpoints), spew.Sdump(newEndpoints))
|
||||
@@ -1375,14 +1375,14 @@ func Test_accumulateEndpointsMap(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func makeTestEndpoints(namespace, name string, eptFunc func(*api.Endpoints)) api.Endpoints {
|
||||
ept := api.Endpoints{
|
||||
func makeTestEndpoints(namespace, name string, eptFunc func(*api.Endpoints)) *api.Endpoints {
|
||||
ept := &api.Endpoints{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
Namespace: namespace,
|
||||
},
|
||||
}
|
||||
eptFunc(&ept)
|
||||
eptFunc(ept)
|
||||
return ept
|
||||
}
|
||||
|
||||
@@ -1398,19 +1398,19 @@ func makeServicePortName(ns, name, port string) proxy.ServicePortName {
|
||||
|
||||
func Test_updateEndpoints(t *testing.T) {
|
||||
testCases := []struct {
|
||||
newEndpoints []api.Endpoints
|
||||
newEndpoints []*api.Endpoints
|
||||
oldEndpoints map[proxy.ServicePortName][]*endpointsInfo
|
||||
expectedResult map[proxy.ServicePortName][]*endpointsInfo
|
||||
expectedStale []endpointServicePair
|
||||
}{{
|
||||
// Case[0]: nothing
|
||||
newEndpoints: []api.Endpoints{},
|
||||
newEndpoints: []*api.Endpoints{},
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{},
|
||||
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{},
|
||||
expectedStale: []endpointServicePair{},
|
||||
}, {
|
||||
// Case[1]: no change, unnamed port
|
||||
newEndpoints: []api.Endpoints{
|
||||
newEndpoints: []*api.Endpoints{
|
||||
makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
|
||||
ept.Subsets = []api.EndpointSubset{{
|
||||
Addresses: []api.EndpointAddress{{
|
||||
@@ -1435,7 +1435,7 @@ func Test_updateEndpoints(t *testing.T) {
|
||||
expectedStale: []endpointServicePair{},
|
||||
}, {
|
||||
// Case[2]: no change, named port
|
||||
newEndpoints: []api.Endpoints{
|
||||
newEndpoints: []*api.Endpoints{
|
||||
makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
|
||||
ept.Subsets = []api.EndpointSubset{{
|
||||
Addresses: []api.EndpointAddress{{
|
||||
@@ -1461,7 +1461,7 @@ func Test_updateEndpoints(t *testing.T) {
|
||||
expectedStale: []endpointServicePair{},
|
||||
}, {
|
||||
// Case[3]: no change, multiple subsets
|
||||
newEndpoints: []api.Endpoints{
|
||||
newEndpoints: []*api.Endpoints{
|
||||
makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
|
||||
ept.Subsets = []api.EndpointSubset{{
|
||||
Addresses: []api.EndpointAddress{{
|
||||
@@ -1501,7 +1501,7 @@ func Test_updateEndpoints(t *testing.T) {
|
||||
expectedStale: []endpointServicePair{},
|
||||
}, {
|
||||
// Case[4]: no change, multiple subsets, multiple ports
|
||||
newEndpoints: []api.Endpoints{
|
||||
newEndpoints: []*api.Endpoints{
|
||||
makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
|
||||
ept.Subsets = []api.EndpointSubset{{
|
||||
Addresses: []api.EndpointAddress{{
|
||||
@@ -1550,7 +1550,7 @@ func Test_updateEndpoints(t *testing.T) {
|
||||
expectedStale: []endpointServicePair{},
|
||||
}, {
|
||||
// Case[5]: no change, multiple endpoints, subsets, IPs, and ports
|
||||
newEndpoints: []api.Endpoints{
|
||||
newEndpoints: []*api.Endpoints{
|
||||
makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
|
||||
ept.Subsets = []api.EndpointSubset{{
|
||||
Addresses: []api.EndpointAddress{{
|
||||
@@ -1652,7 +1652,7 @@ func Test_updateEndpoints(t *testing.T) {
|
||||
expectedStale: []endpointServicePair{},
|
||||
}, {
|
||||
// Case[6]: add an Endpoints
|
||||
newEndpoints: []api.Endpoints{
|
||||
newEndpoints: []*api.Endpoints{
|
||||
makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
|
||||
ept.Subsets = []api.EndpointSubset{{
|
||||
Addresses: []api.EndpointAddress{{
|
||||
@@ -1673,7 +1673,7 @@ func Test_updateEndpoints(t *testing.T) {
|
||||
expectedStale: []endpointServicePair{},
|
||||
}, {
|
||||
// Case[7]: remove an Endpoints
|
||||
newEndpoints: []api.Endpoints{ /* empty */ },
|
||||
newEndpoints: []*api.Endpoints{ /* empty */ },
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
makeServicePortName("ns1", "ep1", ""): {
|
||||
{"1.1.1.1:11", false},
|
||||
@@ -1686,7 +1686,7 @@ func Test_updateEndpoints(t *testing.T) {
|
||||
}},
|
||||
}, {
|
||||
// Case[8]: add an IP and port
|
||||
newEndpoints: []api.Endpoints{
|
||||
newEndpoints: []*api.Endpoints{
|
||||
makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
|
||||
ept.Subsets = []api.EndpointSubset{{
|
||||
Addresses: []api.EndpointAddress{{
|
||||
@@ -1722,7 +1722,7 @@ func Test_updateEndpoints(t *testing.T) {
|
||||
expectedStale: []endpointServicePair{},
|
||||
}, {
|
||||
// Case[9]: remove an IP and port
|
||||
newEndpoints: []api.Endpoints{
|
||||
newEndpoints: []*api.Endpoints{
|
||||
makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
|
||||
ept.Subsets = []api.EndpointSubset{{
|
||||
Addresses: []api.EndpointAddress{{
|
||||
@@ -1762,7 +1762,7 @@ func Test_updateEndpoints(t *testing.T) {
|
||||
}},
|
||||
}, {
|
||||
// Case[10]: add a subset
|
||||
newEndpoints: []api.Endpoints{
|
||||
newEndpoints: []*api.Endpoints{
|
||||
makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
|
||||
ept.Subsets = []api.EndpointSubset{{
|
||||
Addresses: []api.EndpointAddress{{
|
||||
@@ -1799,7 +1799,7 @@ func Test_updateEndpoints(t *testing.T) {
|
||||
expectedStale: []endpointServicePair{},
|
||||
}, {
|
||||
// Case[11]: remove a subset
|
||||
newEndpoints: []api.Endpoints{
|
||||
newEndpoints: []*api.Endpoints{
|
||||
makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
|
||||
ept.Subsets = []api.EndpointSubset{{
|
||||
Addresses: []api.EndpointAddress{{
|
||||
@@ -1831,7 +1831,7 @@ func Test_updateEndpoints(t *testing.T) {
|
||||
}},
|
||||
}, {
|
||||
// Case[12]: rename a port
|
||||
newEndpoints: []api.Endpoints{
|
||||
newEndpoints: []*api.Endpoints{
|
||||
makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
|
||||
ept.Subsets = []api.EndpointSubset{{
|
||||
Addresses: []api.EndpointAddress{{
|
||||
@@ -1860,7 +1860,7 @@ func Test_updateEndpoints(t *testing.T) {
|
||||
}},
|
||||
}, {
|
||||
// Case[13]: renumber a port
|
||||
newEndpoints: []api.Endpoints{
|
||||
newEndpoints: []*api.Endpoints{
|
||||
makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
|
||||
ept.Subsets = []api.EndpointSubset{{
|
||||
Addresses: []api.EndpointAddress{{
|
||||
@@ -1889,7 +1889,7 @@ func Test_updateEndpoints(t *testing.T) {
|
||||
}},
|
||||
}, {
|
||||
// Case[14]: complex add and remove
|
||||
newEndpoints: []api.Endpoints{
|
||||
newEndpoints: []*api.Endpoints{
|
||||
makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
|
||||
ept.Subsets = []api.EndpointSubset{{
|
||||
Addresses: []api.EndpointAddress{{
|
||||
|
||||
Reference in New Issue
Block a user