diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index f8ee5b30ab5..c22d5369de8 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -614,16 +614,45 @@ func buildNewEndpointsMap(allEndpoints []*api.Endpoints, curMap proxyEndpointMap } } - // Update service health check - allSvcPorts := make(map[proxy.ServicePortName]bool) + if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) { + return + } + + // Update service health check. We include entries from the current map, + // with zero-length value, to trigger the healthchecker to stop reporting + // health for that service. + // + // This whole mechanism may be over-designed. It builds a list of endpoints + // per service, filters for local endpoints, builds a string that is the + // same as the name, and then passes each (name, list) pair over a channel. + // + // I am pretty sure that there's no way there can be more than one entry in + // the final list, and passing an empty list as a delete signal is weird. + // It could probably be simplified to a synchronous function call of a set + // of NamespacedNames. I am not making that simplification at this time. + // + // ServicePortName includes the port name, which doesn't matter for + // healthchecks. It's possible that a single update both added and removed + // ports on the same IP, so we need to make sure that removals are counted, + // with additions overriding them. Track all endpoints so we can find local + // ones. + epsBySvcName := map[types.NamespacedName][]*endpointsInfo{} for svcPort := range curMap { - allSvcPorts[svcPort] = true + epsBySvcName[svcPort.NamespacedName] = nil } for svcPort := range newMap { - allSvcPorts[svcPort] = true + epsBySvcName[svcPort.NamespacedName] = append(epsBySvcName[svcPort.NamespacedName], newMap[svcPort]...) } - for svcPort := range allSvcPorts { - updateHealthCheckEntries(svcPort.NamespacedName, newMap[svcPort], healthChecker) + for nsn, eps := range epsBySvcName { + // Use a set instead of a slice to provide deduplication + epSet := sets.NewString() + for _, ep := range eps { + if ep.isLocal { + // kube-proxy health check only needs local endpoints + epSet.Insert(fmt.Sprintf("%s/%s", nsn.Namespace, nsn.Name)) + } + } + healthChecker.UpdateEndpoints(nsn, epSet) } return newMap, staleSet @@ -674,23 +703,6 @@ func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, } } -// updateHealthCheckEntries - send the new set of local endpoints to the health checker -func updateHealthCheckEntries(name types.NamespacedName, endpoints []*endpointsInfo, healthChecker healthChecker) { - if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) { - return - } - - // Use a set instead of a slice to provide deduplication - epSet := sets.NewString() - for _, portInfo := range endpoints { - if portInfo.isLocal { - // kube-proxy health check only needs local endpoints - epSet.Insert(fmt.Sprintf("%s/%s", name.Namespace, name.Name)) - } - } - healthChecker.UpdateEndpoints(name, epSet) -} - // portProtoHash takes the ServicePortName and protocol for a service // returns the associated 16 character hash. This is computed by hashing (sha256) // then encoding to base32 and truncating to 16 chars. We do this because IPTables diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 016c7dba445..a8c77210355 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -17,6 +17,7 @@ limitations under the License. package iptables import ( + "reflect" "strconv" "testing" @@ -355,9 +356,23 @@ func (f *fakePortOpener) OpenLocalPort(lp *localPort) (closeable, error) { return nil, nil } -type fakeHealthChecker struct{} +type fakeHealthChecker struct { + endpoints map[types.NamespacedName]sets.String +} -func (fakeHealthChecker) UpdateEndpoints(serviceName types.NamespacedName, endpointUIDs sets.String) {} +func newFakeHealthChecker() *fakeHealthChecker { + return &fakeHealthChecker{ + endpoints: map[types.NamespacedName]sets.String{}, + } +} + +func (fake *fakeHealthChecker) UpdateEndpoints(serviceName types.NamespacedName, endpointUIDs sets.String) { + if len(endpointUIDs) == 0 { + delete(fake.endpoints, serviceName) + } else { + fake.endpoints[serviceName] = endpointUIDs + } +} const testHostname = "test-hostname" @@ -374,7 +389,7 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier { hostname: testHostname, portsMap: make(map[localPort]closeable), portMapper: &fakePortOpener{[]*localPort{}}, - healthChecker: fakeHealthChecker{}, + healthChecker: newFakeHealthChecker(), } } @@ -1397,17 +1412,21 @@ func makeServicePortName(ns, name, port string) proxy.ServicePortName { } func Test_buildNewEndpointsMap(t *testing.T) { + var nodeName = "host" + testCases := []struct { - newEndpoints []*api.Endpoints - oldEndpoints map[proxy.ServicePortName][]*endpointsInfo - expectedResult map[proxy.ServicePortName][]*endpointsInfo - expectedStale []endpointServicePair + newEndpoints []*api.Endpoints + oldEndpoints map[proxy.ServicePortName][]*endpointsInfo + expectedResult map[proxy.ServicePortName][]*endpointsInfo + expectedStale []endpointServicePair + expectedHealthchecks map[types.NamespacedName]sets.String }{{ // Case[0]: nothing - newEndpoints: []*api.Endpoints{}, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, - expectedStale: []endpointServicePair{}, + newEndpoints: []*api.Endpoints{}, + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, + expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, + expectedStale: []endpointServicePair{}, + expectedHealthchecks: map[types.NamespacedName]sets.String{}, }, { // Case[1]: no change, unnamed port newEndpoints: []*api.Endpoints{ @@ -1432,14 +1451,16 @@ func Test_buildNewEndpointsMap(t *testing.T) { {"1.1.1.1:11", false}, }, }, - expectedStale: []endpointServicePair{}, + expectedStale: []endpointServicePair{}, + expectedHealthchecks: map[types.NamespacedName]sets.String{}, }, { - // Case[2]: no change, named port + // Case[2]: no change, named port, local newEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ - IP: "1.1.1.1", + IP: "1.1.1.1", + NodeName: &nodeName, }}, Ports: []api.EndpointPort{{ Name: "p11", @@ -1450,15 +1471,18 @@ func Test_buildNewEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {"1.1.1.1:11", false}, + {"1.1.1.1:11", true}, }, }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {"1.1.1.1:11", false}, + {"1.1.1.1:11", true}, }, }, expectedStale: []endpointServicePair{}, + expectedHealthchecks: map[types.NamespacedName]sets.String{ + types.NamespacedName{Namespace: "ns1", Name: "ep1"}: sets.NewString("ns1/ep1"), + }, }, { // Case[3]: no change, multiple subsets newEndpoints: []*api.Endpoints{ @@ -1498,14 +1522,16 @@ func Test_buildNewEndpointsMap(t *testing.T) { {"1.1.1.2:12", false}, }, }, - expectedStale: []endpointServicePair{}, + expectedStale: []endpointServicePair{}, + expectedHealthchecks: map[types.NamespacedName]sets.String{}, }, { - // Case[4]: no change, multiple subsets, multiple ports + // Case[4]: no change, multiple subsets, multiple ports, local newEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ - IP: "1.1.1.1", + IP: "1.1.1.1", + NodeName: &nodeName, }}, Ports: []api.EndpointPort{{ Name: "p11", @@ -1527,10 +1553,10 @@ func Test_buildNewEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {"1.1.1.1:11", false}, + {"1.1.1.1:11", true}, }, makeServicePortName("ns1", "ep1", "p12"): { - {"1.1.1.1:12", false}, + {"1.1.1.1:12", true}, }, makeServicePortName("ns1", "ep1", "p13"): { {"1.1.1.3:13", false}, @@ -1538,16 +1564,19 @@ func Test_buildNewEndpointsMap(t *testing.T) { }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {"1.1.1.1:11", false}, + {"1.1.1.1:11", true}, }, makeServicePortName("ns1", "ep1", "p12"): { - {"1.1.1.1:12", false}, + {"1.1.1.1:12", true}, }, makeServicePortName("ns1", "ep1", "p13"): { {"1.1.1.3:13", false}, }, }, expectedStale: []endpointServicePair{}, + expectedHealthchecks: map[types.NamespacedName]sets.String{ + types.NamespacedName{Namespace: "ns1", Name: "ep1"}: sets.NewString("ns1/ep1"), + }, }, { // Case[5]: no change, multiple endpoints, subsets, IPs, and ports newEndpoints: []*api.Endpoints{ @@ -1556,7 +1585,8 @@ func Test_buildNewEndpointsMap(t *testing.T) { Addresses: []api.EndpointAddress{{ IP: "1.1.1.1", }, { - IP: "1.1.1.2", + IP: "1.1.1.2", + NodeName: &nodeName, }}, Ports: []api.EndpointPort{{ Name: "p11", @@ -1569,7 +1599,8 @@ func Test_buildNewEndpointsMap(t *testing.T) { Addresses: []api.EndpointAddress{{ IP: "1.1.1.3", }, { - IP: "1.1.1.4", + IP: "1.1.1.4", + NodeName: &nodeName, }}, Ports: []api.EndpointPort{{ Name: "p13", @@ -1585,7 +1616,8 @@ func Test_buildNewEndpointsMap(t *testing.T) { Addresses: []api.EndpointAddress{{ IP: "2.2.2.1", }, { - IP: "2.2.2.2", + IP: "2.2.2.2", + NodeName: &nodeName, }}, Ports: []api.EndpointPort{{ Name: "p21", @@ -1600,63 +1632,68 @@ func Test_buildNewEndpointsMap(t *testing.T) { oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { {"1.1.1.1:11", false}, - {"1.1.1.2:11", false}, + {"1.1.1.2:11", true}, }, makeServicePortName("ns1", "ep1", "p12"): { {"1.1.1.1:12", false}, - {"1.1.1.2:12", false}, + {"1.1.1.2:12", true}, }, makeServicePortName("ns1", "ep1", "p13"): { {"1.1.1.3:13", false}, - {"1.1.1.4:13", false}, + {"1.1.1.4:13", true}, }, makeServicePortName("ns1", "ep1", "p14"): { {"1.1.1.3:14", false}, - {"1.1.1.4:14", false}, + {"1.1.1.4:14", true}, }, makeServicePortName("ns2", "ep2", "p21"): { {"2.2.2.1:21", false}, - {"2.2.2.2:21", false}, + {"2.2.2.2:21", true}, }, makeServicePortName("ns2", "ep2", "p22"): { {"2.2.2.1:22", false}, - {"2.2.2.2:22", false}, + {"2.2.2.2:22", true}, }, }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { {"1.1.1.1:11", false}, - {"1.1.1.2:11", false}, + {"1.1.1.2:11", true}, }, makeServicePortName("ns1", "ep1", "p12"): { {"1.1.1.1:12", false}, - {"1.1.1.2:12", false}, + {"1.1.1.2:12", true}, }, makeServicePortName("ns1", "ep1", "p13"): { {"1.1.1.3:13", false}, - {"1.1.1.4:13", false}, + {"1.1.1.4:13", true}, }, makeServicePortName("ns1", "ep1", "p14"): { {"1.1.1.3:14", false}, - {"1.1.1.4:14", false}, + {"1.1.1.4:14", true}, }, makeServicePortName("ns2", "ep2", "p21"): { {"2.2.2.1:21", false}, - {"2.2.2.2:21", false}, + {"2.2.2.2:21", true}, }, makeServicePortName("ns2", "ep2", "p22"): { {"2.2.2.1:22", false}, - {"2.2.2.2:22", false}, + {"2.2.2.2:22", true}, }, }, expectedStale: []endpointServicePair{}, + expectedHealthchecks: map[types.NamespacedName]sets.String{ + types.NamespacedName{Namespace: "ns1", Name: "ep1"}: sets.NewString("ns1/ep1"), + types.NamespacedName{Namespace: "ns2", Name: "ep2"}: sets.NewString("ns2/ep2"), + }, }, { // Case[6]: add an Endpoints newEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ - IP: "1.1.1.1", + IP: "1.1.1.1", + NodeName: &nodeName, }}, Ports: []api.EndpointPort{{ Port: 11, @@ -1667,16 +1704,19 @@ func Test_buildNewEndpointsMap(t *testing.T) { oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ /* empty */ }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", ""): { - {"1.1.1.1:11", false}, + {"1.1.1.1:11", true}, }, }, expectedStale: []endpointServicePair{}, + expectedHealthchecks: map[types.NamespacedName]sets.String{ + types.NamespacedName{Namespace: "ns1", Name: "ep1"}: sets.NewString("ns1/ep1"), + }, }, { // Case[7]: remove an Endpoints newEndpoints: []*api.Endpoints{ /* empty */ }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", ""): { - {"1.1.1.1:11", false}, + {"1.1.1.1:11", true}, }, }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, @@ -1684,6 +1724,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { endpoint: "1.1.1.1:11", servicePortName: makeServicePortName("ns1", "ep1", ""), }}, + expectedHealthchecks: map[types.NamespacedName]sets.String{}, }, { // Case[8]: add an IP and port newEndpoints: []*api.Endpoints{ @@ -1692,7 +1733,8 @@ func Test_buildNewEndpointsMap(t *testing.T) { Addresses: []api.EndpointAddress{{ IP: "1.1.1.1", }, { - IP: "1.1.1.2", + IP: "1.1.1.2", + NodeName: &nodeName, }}, Ports: []api.EndpointPort{{ Name: "p11", @@ -1712,14 +1754,17 @@ func Test_buildNewEndpointsMap(t *testing.T) { expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { {"1.1.1.1:11", false}, - {"1.1.1.2:11", false}, + {"1.1.1.2:11", true}, }, makeServicePortName("ns1", "ep1", "p12"): { {"1.1.1.1:12", false}, - {"1.1.1.2:12", false}, + {"1.1.1.2:12", true}, }, }, expectedStale: []endpointServicePair{}, + expectedHealthchecks: map[types.NamespacedName]sets.String{ + types.NamespacedName{Namespace: "ns1", Name: "ep1"}: sets.NewString("ns1/ep1"), + }, }, { // Case[9]: remove an IP and port newEndpoints: []*api.Endpoints{ @@ -1760,6 +1805,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { endpoint: "1.1.1.2:12", servicePortName: makeServicePortName("ns1", "ep1", "p12"), }}, + expectedHealthchecks: map[types.NamespacedName]sets.String{}, }, { // Case[10]: add a subset newEndpoints: []*api.Endpoints{ @@ -1774,7 +1820,8 @@ func Test_buildNewEndpointsMap(t *testing.T) { }}, }, { Addresses: []api.EndpointAddress{{ - IP: "2.2.2.2", + IP: "2.2.2.2", + NodeName: &nodeName, }}, Ports: []api.EndpointPort{{ Name: "p22", @@ -1793,10 +1840,13 @@ func Test_buildNewEndpointsMap(t *testing.T) { {"1.1.1.1:11", false}, }, makeServicePortName("ns1", "ep1", "p22"): { - {"2.2.2.2:22", false}, + {"2.2.2.2:22", true}, }, }, expectedStale: []endpointServicePair{}, + expectedHealthchecks: map[types.NamespacedName]sets.String{ + types.NamespacedName{Namespace: "ns1", Name: "ep1"}: sets.NewString("ns1/ep1"), + }, }, { // Case[11]: remove a subset newEndpoints: []*api.Endpoints{ @@ -1829,6 +1879,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { endpoint: "2.2.2.2:22", servicePortName: makeServicePortName("ns1", "ep1", "p22"), }}, + expectedHealthchecks: map[types.NamespacedName]sets.String{}, }, { // Case[12]: rename a port newEndpoints: []*api.Endpoints{ @@ -1858,6 +1909,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { endpoint: "1.1.1.1:11", servicePortName: makeServicePortName("ns1", "ep1", "p11"), }}, + expectedHealthchecks: map[types.NamespacedName]sets.String{}, }, { // Case[13]: renumber a port newEndpoints: []*api.Endpoints{ @@ -1887,6 +1939,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { endpoint: "1.1.1.1:11", servicePortName: makeServicePortName("ns1", "ep1", "p11"), }}, + expectedHealthchecks: map[types.NamespacedName]sets.String{}, }, { // Case[14]: complex add and remove newEndpoints: []*api.Endpoints{ @@ -1928,7 +1981,8 @@ func Test_buildNewEndpointsMap(t *testing.T) { makeTestEndpoints("ns4", "ep4", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ - IP: "4.4.4.4", + IP: "4.4.4.4", + NodeName: &nodeName, }}, Ports: []api.EndpointPort{{ Name: "p44", @@ -1942,18 +1996,18 @@ func Test_buildNewEndpointsMap(t *testing.T) { {"1.1.1.1:11", false}, }, makeServicePortName("ns2", "ep2", "p22"): { - {"2.2.2.2:22", false}, - {"2.2.2.22:22", false}, + {"2.2.2.2:22", true}, + {"2.2.2.22:22", true}, }, makeServicePortName("ns2", "ep2", "p23"): { - {"2.2.2.3:23", false}, + {"2.2.2.3:23", true}, }, makeServicePortName("ns4", "ep4", "p44"): { - {"4.4.4.4:44", false}, - {"4.4.4.5:44", false}, + {"4.4.4.4:44", true}, + {"4.4.4.5:44", true}, }, makeServicePortName("ns4", "ep4", "p45"): { - {"4.4.4.6:45", false}, + {"4.4.4.6:45", true}, }, }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ @@ -1971,7 +2025,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { {"3.3.3.3:33", false}, }, makeServicePortName("ns4", "ep4", "p44"): { - {"4.4.4.4:44", false}, + {"4.4.4.4:44", true}, }, }, expectedStale: []endpointServicePair{{ @@ -1990,10 +2044,14 @@ func Test_buildNewEndpointsMap(t *testing.T) { endpoint: "4.4.4.6:45", servicePortName: makeServicePortName("ns4", "ep4", "p45"), }}, + expectedHealthchecks: map[types.NamespacedName]sets.String{ + types.NamespacedName{Namespace: "ns4", Name: "ep4"}: sets.NewString("ns4/ep4"), + }, }} for tci, tc := range testCases { - newMap, stale := buildNewEndpointsMap(tc.newEndpoints, tc.oldEndpoints, "host", fakeHealthChecker{}) + hc := newFakeHealthChecker() + newMap, stale := buildNewEndpointsMap(tc.newEndpoints, tc.oldEndpoints, nodeName, hc) if len(newMap) != len(tc.expectedResult) { t.Errorf("[%d] expected %d results, got %d: %v", tci, len(tc.expectedResult), len(newMap), newMap) } @@ -2016,6 +2074,9 @@ func Test_buildNewEndpointsMap(t *testing.T) { t.Errorf("[%d] expected stale[%v], but didn't find it: %v", tci, x, stale) } } + if !reflect.DeepEqual(hc.endpoints, tc.expectedHealthchecks) { + t.Errorf("[%d] expected healthchecks %v, got %v", tci, tc.expectedHealthchecks, hc.endpoints) + } } }