mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 12:43:23 +00:00
buildPortsToEndpointsMap should use flattened value type
This commit is contained in:
parent
46e58df837
commit
2f671340c9
@ -21,7 +21,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strconv"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -29,6 +28,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/klog"
|
"k8s.io/klog"
|
||||||
"k8s.io/kubernetes/pkg/proxy"
|
"k8s.io/kubernetes/pkg/proxy"
|
||||||
|
"k8s.io/kubernetes/pkg/proxy/util"
|
||||||
"k8s.io/kubernetes/pkg/util/slice"
|
"k8s.io/kubernetes/pkg/util/slice"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -188,28 +188,6 @@ func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr ne
|
|||||||
return endpoint, nil
|
return endpoint, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type hostPortPair struct {
|
|
||||||
host string
|
|
||||||
port int
|
|
||||||
}
|
|
||||||
|
|
||||||
func isValidEndpoint(hpp *hostPortPair) bool {
|
|
||||||
return hpp.host != "" && hpp.port > 0
|
|
||||||
}
|
|
||||||
|
|
||||||
func flattenValidEndpoints(endpoints []hostPortPair) []string {
|
|
||||||
// Convert Endpoint objects into strings for easier use later. Ignore
|
|
||||||
// the protocol field - we'll get that from the Service objects.
|
|
||||||
var result []string
|
|
||||||
for i := range endpoints {
|
|
||||||
hpp := &endpoints[i]
|
|
||||||
if isValidEndpoint(hpp) {
|
|
||||||
result = append(result, net.JoinHostPort(hpp.host, strconv.Itoa(hpp.port)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return result
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove any session affinity records associated to a particular endpoint (for example when a pod goes down).
|
// Remove any session affinity records associated to a particular endpoint (for example when a pod goes down).
|
||||||
func removeSessionAffinityByEndpoint(state *balancerState, svcPort proxy.ServicePortName, endpoint string) {
|
func removeSessionAffinityByEndpoint(state *balancerState, svcPort proxy.ServicePortName, endpoint string) {
|
||||||
for _, affinity := range state.affinity.affinityMap {
|
for _, affinity := range state.affinity.affinityMap {
|
||||||
@ -243,33 +221,15 @@ func (lb *LoadBalancerRR) updateAffinityMap(svcPort proxy.ServicePortName, newEn
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// buildPortsToEndpointsMap builds a map of portname -> all ip:ports for that
|
|
||||||
// portname. Expode Endpoints.Subsets[*] into this structure.
|
|
||||||
func buildPortsToEndpointsMap(endpoints *v1.Endpoints) map[string][]hostPortPair {
|
|
||||||
portsToEndpoints := map[string][]hostPortPair{}
|
|
||||||
for i := range endpoints.Subsets {
|
|
||||||
ss := &endpoints.Subsets[i]
|
|
||||||
for i := range ss.Ports {
|
|
||||||
port := &ss.Ports[i]
|
|
||||||
for i := range ss.Addresses {
|
|
||||||
addr := &ss.Addresses[i]
|
|
||||||
portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], hostPortPair{addr.IP, int(port.Port)})
|
|
||||||
// Ignore the protocol field - we'll get that from the Service objects.
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return portsToEndpoints
|
|
||||||
}
|
|
||||||
|
|
||||||
func (lb *LoadBalancerRR) OnEndpointsAdd(endpoints *v1.Endpoints) {
|
func (lb *LoadBalancerRR) OnEndpointsAdd(endpoints *v1.Endpoints) {
|
||||||
portsToEndpoints := buildPortsToEndpointsMap(endpoints)
|
portsToEndpoints := util.BuildPortsToEndpointsMap(endpoints)
|
||||||
|
|
||||||
lb.lock.Lock()
|
lb.lock.Lock()
|
||||||
defer lb.lock.Unlock()
|
defer lb.lock.Unlock()
|
||||||
|
|
||||||
for portname := range portsToEndpoints {
|
for portname := range portsToEndpoints {
|
||||||
svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname}
|
svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname}
|
||||||
newEndpoints := flattenValidEndpoints(portsToEndpoints[portname])
|
newEndpoints := portsToEndpoints[portname]
|
||||||
state, exists := lb.services[svcPort]
|
state, exists := lb.services[svcPort]
|
||||||
|
|
||||||
if !exists || state == nil || len(newEndpoints) > 0 {
|
if !exists || state == nil || len(newEndpoints) > 0 {
|
||||||
@ -289,8 +249,8 @@ func (lb *LoadBalancerRR) OnEndpointsAdd(endpoints *v1.Endpoints) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (lb *LoadBalancerRR) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
|
func (lb *LoadBalancerRR) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
|
||||||
portsToEndpoints := buildPortsToEndpointsMap(endpoints)
|
portsToEndpoints := util.BuildPortsToEndpointsMap(endpoints)
|
||||||
oldPortsToEndpoints := buildPortsToEndpointsMap(oldEndpoints)
|
oldPortsToEndpoints := util.BuildPortsToEndpointsMap(oldEndpoints)
|
||||||
registeredEndpoints := make(map[proxy.ServicePortName]bool)
|
registeredEndpoints := make(map[proxy.ServicePortName]bool)
|
||||||
|
|
||||||
lb.lock.Lock()
|
lb.lock.Lock()
|
||||||
@ -298,7 +258,7 @@ func (lb *LoadBalancerRR) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoint
|
|||||||
|
|
||||||
for portname := range portsToEndpoints {
|
for portname := range portsToEndpoints {
|
||||||
svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname}
|
svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname}
|
||||||
newEndpoints := flattenValidEndpoints(portsToEndpoints[portname])
|
newEndpoints := portsToEndpoints[portname]
|
||||||
state, exists := lb.services[svcPort]
|
state, exists := lb.services[svcPort]
|
||||||
|
|
||||||
curEndpoints := []string{}
|
curEndpoints := []string{}
|
||||||
@ -344,7 +304,7 @@ func (lb *LoadBalancerRR) resetService(svcPort proxy.ServicePortName) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (lb *LoadBalancerRR) OnEndpointsDelete(endpoints *v1.Endpoints) {
|
func (lb *LoadBalancerRR) OnEndpointsDelete(endpoints *v1.Endpoints) {
|
||||||
portsToEndpoints := buildPortsToEndpointsMap(endpoints)
|
portsToEndpoints := util.BuildPortsToEndpointsMap(endpoints)
|
||||||
|
|
||||||
lb.lock.Lock()
|
lb.lock.Lock()
|
||||||
defer lb.lock.Unlock()
|
defer lb.lock.Unlock()
|
||||||
|
@ -26,45 +26,6 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/proxy"
|
"k8s.io/kubernetes/pkg/proxy"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestValidateWorks(t *testing.T) {
|
|
||||||
if isValidEndpoint(&hostPortPair{}) {
|
|
||||||
t.Errorf("Didn't fail for empty set")
|
|
||||||
}
|
|
||||||
if isValidEndpoint(&hostPortPair{host: "foobar"}) {
|
|
||||||
t.Errorf("Didn't fail with invalid port")
|
|
||||||
}
|
|
||||||
if isValidEndpoint(&hostPortPair{host: "foobar", port: -1}) {
|
|
||||||
t.Errorf("Didn't fail with a negative port")
|
|
||||||
}
|
|
||||||
if !isValidEndpoint(&hostPortPair{host: "foobar", port: 8080}) {
|
|
||||||
t.Errorf("Failed a valid config.")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestFilterWorks(t *testing.T) {
|
|
||||||
endpoints := []hostPortPair{
|
|
||||||
{host: "foobar", port: 1},
|
|
||||||
{host: "foobar", port: 2},
|
|
||||||
{host: "foobar", port: -1},
|
|
||||||
{host: "foobar", port: 3},
|
|
||||||
{host: "foobar", port: -2},
|
|
||||||
}
|
|
||||||
filtered := flattenValidEndpoints(endpoints)
|
|
||||||
|
|
||||||
if len(filtered) != 3 {
|
|
||||||
t.Errorf("Failed to filter to the correct size")
|
|
||||||
}
|
|
||||||
if filtered[0] != "foobar:1" {
|
|
||||||
t.Errorf("Index zero is not foobar:1")
|
|
||||||
}
|
|
||||||
if filtered[1] != "foobar:2" {
|
|
||||||
t.Errorf("Index one is not foobar:2")
|
|
||||||
}
|
|
||||||
if filtered[2] != "foobar:3" {
|
|
||||||
t.Errorf("Index two is not foobar:3")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) {
|
func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) {
|
||||||
loadBalancer := NewLoadBalancerRR()
|
loadBalancer := NewLoadBalancerRR()
|
||||||
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "does-not-exist"}
|
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "does-not-exist"}
|
||||||
|
@ -21,6 +21,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
@ -48,6 +49,30 @@ var (
|
|||||||
ErrNoAddresses = errors.New("No addresses for hostname")
|
ErrNoAddresses = errors.New("No addresses for hostname")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// isValidEndpoint checks that the given host / port pair are valid endpoint
|
||||||
|
func isValidEndpoint(host string, port int) bool {
|
||||||
|
return host != "" && port > 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// BuildPortsToEndpointsMap builds a map of portname -> all ip:ports for that
|
||||||
|
// portname. Explode Endpoints.Subsets[*] into this structure.
|
||||||
|
func BuildPortsToEndpointsMap(endpoints *v1.Endpoints) map[string][]string {
|
||||||
|
portsToEndpoints := map[string][]string{}
|
||||||
|
for i := range endpoints.Subsets {
|
||||||
|
ss := &endpoints.Subsets[i]
|
||||||
|
for i := range ss.Ports {
|
||||||
|
port := &ss.Ports[i]
|
||||||
|
for i := range ss.Addresses {
|
||||||
|
addr := &ss.Addresses[i]
|
||||||
|
if isValidEndpoint(addr.IP, int(port.Port)) {
|
||||||
|
portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], net.JoinHostPort(addr.IP, strconv.Itoa(int(port.Port))))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return portsToEndpoints
|
||||||
|
}
|
||||||
|
|
||||||
// IsZeroCIDR checks whether the input CIDR string is either
|
// IsZeroCIDR checks whether the input CIDR string is either
|
||||||
// the IPv4 or IPv6 zero CIDR
|
// the IPv4 or IPv6 zero CIDR
|
||||||
func IsZeroCIDR(cidr string) bool {
|
func IsZeroCIDR(cidr string) bool {
|
||||||
|
@ -19,6 +19,7 @@ package util
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"net"
|
"net"
|
||||||
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
@ -28,6 +29,72 @@ import (
|
|||||||
fake "k8s.io/kubernetes/pkg/proxy/util/testing"
|
fake "k8s.io/kubernetes/pkg/proxy/util/testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestValidateWorks(t *testing.T) {
|
||||||
|
if isValidEndpoint("", 0) {
|
||||||
|
t.Errorf("Didn't fail for empty set")
|
||||||
|
}
|
||||||
|
if isValidEndpoint("foobar", 0) {
|
||||||
|
t.Errorf("Didn't fail with invalid port")
|
||||||
|
}
|
||||||
|
if isValidEndpoint("foobar", -1) {
|
||||||
|
t.Errorf("Didn't fail with a negative port")
|
||||||
|
}
|
||||||
|
if !isValidEndpoint("foobar", 8080) {
|
||||||
|
t.Errorf("Failed a valid config.")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBuildPortsToEndpointsMap(t *testing.T) {
|
||||||
|
endpoints := &v1.Endpoints{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "testnamespace"},
|
||||||
|
Subsets: []v1.EndpointSubset{
|
||||||
|
{
|
||||||
|
Addresses: []v1.EndpointAddress{
|
||||||
|
{IP: "10.0.0.1"},
|
||||||
|
{IP: "10.0.0.2"},
|
||||||
|
},
|
||||||
|
Ports: []v1.EndpointPort{
|
||||||
|
{Name: "http", Port: 80},
|
||||||
|
{Name: "https", Port: 443},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Addresses: []v1.EndpointAddress{
|
||||||
|
{IP: "10.0.0.1"},
|
||||||
|
{IP: "10.0.0.3"},
|
||||||
|
},
|
||||||
|
Ports: []v1.EndpointPort{
|
||||||
|
{Name: "http", Port: 8080},
|
||||||
|
{Name: "dns", Port: 53},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Addresses: []v1.EndpointAddress{},
|
||||||
|
Ports: []v1.EndpointPort{
|
||||||
|
{Name: "http", Port: 8888},
|
||||||
|
{Name: "ssh", Port: 22},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Addresses: []v1.EndpointAddress{
|
||||||
|
{IP: "10.0.0.1"},
|
||||||
|
},
|
||||||
|
Ports: []v1.EndpointPort{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
expectedPortsToEndpoints := map[string][]string{
|
||||||
|
"http": {"10.0.0.1:80", "10.0.0.2:80", "10.0.0.1:8080", "10.0.0.3:8080"},
|
||||||
|
"https": {"10.0.0.1:443", "10.0.0.2:443"},
|
||||||
|
"dns": {"10.0.0.1:53", "10.0.0.3:53"},
|
||||||
|
}
|
||||||
|
|
||||||
|
portsToEndpoints := BuildPortsToEndpointsMap(endpoints)
|
||||||
|
if !reflect.DeepEqual(expectedPortsToEndpoints, portsToEndpoints) {
|
||||||
|
t.Errorf("expected ports to endpoints not seen")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestIsProxyableIP(t *testing.T) {
|
func TestIsProxyableIP(t *testing.T) {
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
ip string
|
ip string
|
||||||
|
@ -20,6 +20,7 @@ go_library(
|
|||||||
"//pkg/apis/core/v1/helper:go_default_library",
|
"//pkg/apis/core/v1/helper:go_default_library",
|
||||||
"//pkg/proxy:go_default_library",
|
"//pkg/proxy:go_default_library",
|
||||||
"//pkg/proxy/config:go_default_library",
|
"//pkg/proxy/config:go_default_library",
|
||||||
|
"//pkg/proxy/util:go_default_library",
|
||||||
"//pkg/util/ipconfig:go_default_library",
|
"//pkg/util/ipconfig:go_default_library",
|
||||||
"//pkg/util/netsh:go_default_library",
|
"//pkg/util/netsh:go_default_library",
|
||||||
"//pkg/util/slice:go_default_library",
|
"//pkg/util/slice:go_default_library",
|
||||||
|
@ -21,7 +21,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strconv"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -29,6 +28,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/klog"
|
"k8s.io/klog"
|
||||||
"k8s.io/kubernetes/pkg/proxy"
|
"k8s.io/kubernetes/pkg/proxy"
|
||||||
|
"k8s.io/kubernetes/pkg/proxy/util"
|
||||||
"k8s.io/kubernetes/pkg/util/slice"
|
"k8s.io/kubernetes/pkg/util/slice"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -178,28 +178,6 @@ func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr ne
|
|||||||
return endpoint, nil
|
return endpoint, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type hostPortPair struct {
|
|
||||||
host string
|
|
||||||
port int
|
|
||||||
}
|
|
||||||
|
|
||||||
func isValidEndpoint(hpp *hostPortPair) bool {
|
|
||||||
return hpp.host != "" && hpp.port > 0
|
|
||||||
}
|
|
||||||
|
|
||||||
func flattenValidEndpoints(endpoints []hostPortPair) []string {
|
|
||||||
// Convert Endpoint objects into strings for easier use later. Ignore
|
|
||||||
// the protocol field - we'll get that from the Service objects.
|
|
||||||
var result []string
|
|
||||||
for i := range endpoints {
|
|
||||||
hpp := &endpoints[i]
|
|
||||||
if isValidEndpoint(hpp) {
|
|
||||||
result = append(result, net.JoinHostPort(hpp.host, strconv.Itoa(hpp.port)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return result
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove any session affinity records associated to a particular endpoint (for example when a pod goes down).
|
// Remove any session affinity records associated to a particular endpoint (for example when a pod goes down).
|
||||||
func removeSessionAffinityByEndpoint(state *balancerState, svcPort proxy.ServicePortName, endpoint string) {
|
func removeSessionAffinityByEndpoint(state *balancerState, svcPort proxy.ServicePortName, endpoint string) {
|
||||||
for _, affinity := range state.affinity.affinityMap {
|
for _, affinity := range state.affinity.affinityMap {
|
||||||
@ -233,33 +211,15 @@ func (lb *LoadBalancerRR) updateAffinityMap(svcPort proxy.ServicePortName, newEn
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// buildPortsToEndpointsMap builds a map of portname -> all ip:ports for that
|
|
||||||
// portname. Explode Endpoints.Subsets[*] into this structure.
|
|
||||||
func buildPortsToEndpointsMap(endpoints *v1.Endpoints) map[string][]hostPortPair {
|
|
||||||
portsToEndpoints := map[string][]hostPortPair{}
|
|
||||||
for i := range endpoints.Subsets {
|
|
||||||
ss := &endpoints.Subsets[i]
|
|
||||||
for i := range ss.Ports {
|
|
||||||
port := &ss.Ports[i]
|
|
||||||
for i := range ss.Addresses {
|
|
||||||
addr := &ss.Addresses[i]
|
|
||||||
portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], hostPortPair{addr.IP, int(port.Port)})
|
|
||||||
// Ignore the protocol field - we'll get that from the Service objects.
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return portsToEndpoints
|
|
||||||
}
|
|
||||||
|
|
||||||
func (lb *LoadBalancerRR) OnEndpointsAdd(endpoints *v1.Endpoints) {
|
func (lb *LoadBalancerRR) OnEndpointsAdd(endpoints *v1.Endpoints) {
|
||||||
portsToEndpoints := buildPortsToEndpointsMap(endpoints)
|
portsToEndpoints := util.BuildPortsToEndpointsMap(endpoints)
|
||||||
|
|
||||||
lb.lock.Lock()
|
lb.lock.Lock()
|
||||||
defer lb.lock.Unlock()
|
defer lb.lock.Unlock()
|
||||||
|
|
||||||
for portname := range portsToEndpoints {
|
for portname := range portsToEndpoints {
|
||||||
svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname}
|
svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname}
|
||||||
newEndpoints := flattenValidEndpoints(portsToEndpoints[portname])
|
newEndpoints := portsToEndpoints[portname]
|
||||||
state, exists := lb.services[svcPort]
|
state, exists := lb.services[svcPort]
|
||||||
|
|
||||||
if !exists || state == nil || len(newEndpoints) > 0 {
|
if !exists || state == nil || len(newEndpoints) > 0 {
|
||||||
@ -279,8 +239,8 @@ func (lb *LoadBalancerRR) OnEndpointsAdd(endpoints *v1.Endpoints) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (lb *LoadBalancerRR) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
|
func (lb *LoadBalancerRR) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
|
||||||
portsToEndpoints := buildPortsToEndpointsMap(endpoints)
|
portsToEndpoints := util.BuildPortsToEndpointsMap(endpoints)
|
||||||
oldPortsToEndpoints := buildPortsToEndpointsMap(oldEndpoints)
|
oldPortsToEndpoints := util.BuildPortsToEndpointsMap(oldEndpoints)
|
||||||
registeredEndpoints := make(map[proxy.ServicePortName]bool)
|
registeredEndpoints := make(map[proxy.ServicePortName]bool)
|
||||||
|
|
||||||
lb.lock.Lock()
|
lb.lock.Lock()
|
||||||
@ -288,7 +248,7 @@ func (lb *LoadBalancerRR) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoint
|
|||||||
|
|
||||||
for portname := range portsToEndpoints {
|
for portname := range portsToEndpoints {
|
||||||
svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname}
|
svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname}
|
||||||
newEndpoints := flattenValidEndpoints(portsToEndpoints[portname])
|
newEndpoints := portsToEndpoints[portname]
|
||||||
state, exists := lb.services[svcPort]
|
state, exists := lb.services[svcPort]
|
||||||
|
|
||||||
curEndpoints := []string{}
|
curEndpoints := []string{}
|
||||||
@ -326,7 +286,7 @@ func (lb *LoadBalancerRR) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoint
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (lb *LoadBalancerRR) OnEndpointsDelete(endpoints *v1.Endpoints) {
|
func (lb *LoadBalancerRR) OnEndpointsDelete(endpoints *v1.Endpoints) {
|
||||||
portsToEndpoints := buildPortsToEndpointsMap(endpoints)
|
portsToEndpoints := util.BuildPortsToEndpointsMap(endpoints)
|
||||||
|
|
||||||
lb.lock.Lock()
|
lb.lock.Lock()
|
||||||
defer lb.lock.Unlock()
|
defer lb.lock.Unlock()
|
||||||
|
@ -26,45 +26,6 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/proxy"
|
"k8s.io/kubernetes/pkg/proxy"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestValidateWorks(t *testing.T) {
|
|
||||||
if isValidEndpoint(&hostPortPair{}) {
|
|
||||||
t.Errorf("Didn't fail for empty set")
|
|
||||||
}
|
|
||||||
if isValidEndpoint(&hostPortPair{host: "foobar"}) {
|
|
||||||
t.Errorf("Didn't fail with invalid port")
|
|
||||||
}
|
|
||||||
if isValidEndpoint(&hostPortPair{host: "foobar", port: -1}) {
|
|
||||||
t.Errorf("Didn't fail with a negative port")
|
|
||||||
}
|
|
||||||
if !isValidEndpoint(&hostPortPair{host: "foobar", port: 8080}) {
|
|
||||||
t.Errorf("Failed a valid config.")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestFilterWorks(t *testing.T) {
|
|
||||||
endpoints := []hostPortPair{
|
|
||||||
{host: "foobar", port: 1},
|
|
||||||
{host: "foobar", port: 2},
|
|
||||||
{host: "foobar", port: -1},
|
|
||||||
{host: "foobar", port: 3},
|
|
||||||
{host: "foobar", port: -2},
|
|
||||||
}
|
|
||||||
filtered := flattenValidEndpoints(endpoints)
|
|
||||||
|
|
||||||
if len(filtered) != 3 {
|
|
||||||
t.Errorf("Failed to filter to the correct size")
|
|
||||||
}
|
|
||||||
if filtered[0] != "foobar:1" {
|
|
||||||
t.Errorf("Index zero is not foobar:1")
|
|
||||||
}
|
|
||||||
if filtered[1] != "foobar:2" {
|
|
||||||
t.Errorf("Index one is not foobar:2")
|
|
||||||
}
|
|
||||||
if filtered[2] != "foobar:3" {
|
|
||||||
t.Errorf("Index two is not foobar:3")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) {
|
func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) {
|
||||||
loadBalancer := NewLoadBalancerRR()
|
loadBalancer := NewLoadBalancerRR()
|
||||||
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "does-not-exist"}
|
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "does-not-exist"}
|
||||||
|
Loading…
Reference in New Issue
Block a user