diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index b583c22998d..9868c363c51 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -154,9 +154,18 @@ type endpointsInfo struct { isLocal bool } +func (e *endpointsInfo) String() string { + return fmt.Sprintf("%v", *e) +} + // returns a new serviceInfo struct func newServiceInfo(serviceName proxy.ServicePortName, port *api.ServicePort, service *api.Service) *serviceInfo { - onlyNodeLocalEndpoints := apiservice.NeedsHealthCheck(service) && utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) && (service.Spec.Type == api.ServiceTypeLoadBalancer || service.Spec.Type == api.ServiceTypeNodePort) + onlyNodeLocalEndpoints := false + if utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) && + (service.Spec.Type == api.ServiceTypeLoadBalancer || service.Spec.Type == api.ServiceTypeNodePort) && + apiservice.NeedsHealthCheck(service) { + onlyNodeLocalEndpoints = true + } info := &serviceInfo{ clusterIP: net.ParseIP(service.Spec.ClusterIP), port: int(port.Port), @@ -543,7 +552,7 @@ func buildNewEndpointsMap(allEndpoints []*api.Endpoints, curMap proxyEndpointMap // Update endpoints for services. for i := range allEndpoints { - accumulateEndpointsMap(allEndpoints[i], hostname, curMap, &newMap) + accumulateEndpointsMap(allEndpoints[i], hostname, &newMap) } // Check stale connections against endpoints missing from the update. // TODO: we should really only mark a connection stale if the proto was UDP @@ -601,10 +610,7 @@ func buildNewEndpointsMap(allEndpoints []*api.Endpoints, curMap proxyEndpointMap // - hostPortInfo and endpointsInfo overlap too much // - the test for this is overlapped by the test for buildNewEndpointsMap // - naming is poor and responsibilities are muddled -func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, - curEndpoints proxyEndpointMap, - newEndpoints *proxyEndpointMap) { - +func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, newEndpoints *proxyEndpointMap) { // We need to build a map of portname -> all ip:ports for that // portname. Explode Endpoints.Subsets[*] into this structure. for i := range endpoints.Subsets { @@ -1306,7 +1312,9 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { } proxier.portsMap = replacementPortsMap - // Update healthchecks. + // Update healthchecks. The endpoints list might include services that are + // not "OnlyLocal", but the services list will not, and the healthChecker + // will just drop those endpoints. if err := proxier.healthChecker.SyncServices(hcServices); err != nil { glog.Errorf("Error syncing healtcheck services: %v", err) } diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index ff8cf6deeee..9b40441ba71 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -1212,13 +1212,11 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { func Test_accumulateEndpointsMap(t *testing.T) { testCases := []struct { newEndpoints *api.Endpoints - oldEndpoints map[proxy.ServicePortName][]*endpointsInfo - expectedNew map[proxy.ServicePortName][]*endpointsInfo + expected map[proxy.ServicePortName][]*endpointsInfo }{{ // Case[0]: nothing newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {}), - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, - expectedNew: map[proxy.ServicePortName][]*endpointsInfo{}, + expected: map[proxy.ServicePortName][]*endpointsInfo{}, }, { // Case[1]: no changes, unnamed port newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { @@ -1234,12 +1232,7 @@ func Test_accumulateEndpointsMap(t *testing.T) { }, } }), - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ - makeServicePortName("ns1", "ep1", ""): { - {"1.1.1.1:11", false}, - }, - }, - expectedNew: map[proxy.ServicePortName][]*endpointsInfo{ + expected: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", ""): { {"1.1.1.1:11", false}, }, @@ -1259,12 +1252,7 @@ func Test_accumulateEndpointsMap(t *testing.T) { }, } }), - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ - makeServicePortName("ns1", "ep1", "port"): { - {"1.1.1.1:11", false}, - }, - }, - expectedNew: map[proxy.ServicePortName][]*endpointsInfo{ + expected: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "port"): { {"1.1.1.1:11", false}, }, @@ -1283,10 +1271,7 @@ func Test_accumulateEndpointsMap(t *testing.T) { }, } }), - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ - makeServicePortName("ns1", "ep1", ""): {}, - }, - expectedNew: map[proxy.ServicePortName][]*endpointsInfo{ + expected: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", ""): { {"1.1.1.1:11", false}, }, @@ -1294,12 +1279,7 @@ func Test_accumulateEndpointsMap(t *testing.T) { }, { // Case[4]: remove port newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {}), - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ - makeServicePortName("ns1", "ep1", ""): { - {"1.1.1.1:11", false}, - }, - }, - expectedNew: map[proxy.ServicePortName][]*endpointsInfo{}, + expected: map[proxy.ServicePortName][]*endpointsInfo{}, }, { // Case[5]: new IP and port newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { @@ -1320,12 +1300,7 @@ func Test_accumulateEndpointsMap(t *testing.T) { }, } }), - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ - makeServicePortName("ns1", "ep1", "p1"): { - {"1.1.1.1:11", false}, - }, - }, - expectedNew: map[proxy.ServicePortName][]*endpointsInfo{ + expected: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p1"): { {"1.1.1.1:11", false}, {"2.2.2.2:11", false}, @@ -1350,17 +1325,7 @@ func Test_accumulateEndpointsMap(t *testing.T) { }, } }), - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ - makeServicePortName("ns1", "ep1", "p1"): { - {"1.1.1.1:11", false}, - {"2.2.2.2:11", false}, - }, - makeServicePortName("ns1", "ep1", "p2"): { - {"1.1.1.1:22", false}, - {"2.2.2.2:22", false}, - }, - }, - expectedNew: map[proxy.ServicePortName][]*endpointsInfo{ + expected: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p1"): { {"1.1.1.1:11", false}, }, @@ -1380,12 +1345,7 @@ func Test_accumulateEndpointsMap(t *testing.T) { }, } }), - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ - makeServicePortName("ns1", "ep1", "p1"): { - {"1.1.1.1:11", false}, - }, - }, - expectedNew: map[proxy.ServicePortName][]*endpointsInfo{ + expected: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p2"): { {"1.1.1.1:11", false}, }, @@ -1405,12 +1365,7 @@ func Test_accumulateEndpointsMap(t *testing.T) { }, } }), - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ - makeServicePortName("ns1", "ep1", "p1"): { - {"1.1.1.1:11", false}, - }, - }, - expectedNew: map[proxy.ServicePortName][]*endpointsInfo{ + expected: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p1"): { {"1.1.1.1:22", false}, }, @@ -1420,18 +1375,18 @@ func Test_accumulateEndpointsMap(t *testing.T) { for tci, tc := range testCases { // outputs newEndpoints := make(proxyEndpointMap) - accumulateEndpointsMap(tc.newEndpoints, "host", tc.oldEndpoints, &newEndpoints) + accumulateEndpointsMap(tc.newEndpoints, "host", &newEndpoints) - if len(newEndpoints) != len(tc.expectedNew) { - t.Errorf("[%d] expected %d new, got %d: %v", tci, len(tc.expectedNew), len(newEndpoints), spew.Sdump(newEndpoints)) + if len(newEndpoints) != len(tc.expected) { + t.Errorf("[%d] expected %d new, got %d: %v", tci, len(tc.expected), len(newEndpoints), spew.Sdump(newEndpoints)) } - for x := range tc.expectedNew { - if len(newEndpoints[x]) != len(tc.expectedNew[x]) { - t.Errorf("[%d] expected %d endpoints for %v, got %d", tci, len(tc.expectedNew[x]), x, len(newEndpoints[x])) + for x := range tc.expected { + if len(newEndpoints[x]) != len(tc.expected[x]) { + t.Errorf("[%d] expected %d endpoints for %v, got %d", tci, len(tc.expected[x]), x, len(newEndpoints[x])) } else { for i := range newEndpoints[x] { - if *(newEndpoints[x][i]) != *(tc.expectedNew[x][i]) { - t.Errorf("[%d] expected new[%v][%d] to be %v, got %v", tci, x, i, tc.expectedNew[x][i], *(newEndpoints[x][i])) + if *(newEndpoints[x][i]) != *(tc.expected[x][i]) { + t.Errorf("[%d] expected new[%v][%d] to be %v, got %v", tci, x, i, tc.expected[x][i], *(newEndpoints[x][i])) } } }