Part 2 of plural ports: make endpoints a struct

Includes conversions for v1b[12] and tests and fixups for call sites.
This commit is contained in:
Tim Hockin
2015-02-18 19:54:15 -08:00
parent 34eaa0dbd6
commit ae0062d001
25 changed files with 461 additions and 142 deletions

View File

@@ -183,7 +183,10 @@ func TestServicesFromZeroError(t *testing.T) {
}
func TestEndpoints(t *testing.T) {
endpoint := api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "2"}, Endpoints: []string{"127.0.0.1:9000"}}
endpoint := api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "2"},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}},
}
fakeWatch := watch.NewFake()
fakeClient := &client.Fake{Watch: fakeWatch}
@@ -230,7 +233,10 @@ func TestEndpoints(t *testing.T) {
}
func TestEndpointsFromZero(t *testing.T) {
endpoint := api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "2"}, Endpoints: []string{"127.0.0.1:9000"}}
endpoint := api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "2"},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}},
}
fakeWatch := watch.NewFake()
fakeWatch.Stop()

View File

@@ -218,11 +218,11 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddedAndNotified(t *testing.
config.RegisterHandler(handler2)
endpointsUpdate1 := CreateEndpointsUpdate(ADD, api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []string{"endpoint1", "endpoint2"},
Endpoints: []api.Endpoint{{IP: "endpoint1"}, {IP: "endpoint2"}},
})
endpointsUpdate2 := CreateEndpointsUpdate(ADD, api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "bar"},
Endpoints: []string{"endpoint3", "endpoint4"},
Endpoints: []api.Endpoint{{IP: "endpoint3"}, {IP: "endpoint4"}},
})
handler.Wait(2)
handler2.Wait(2)
@@ -244,11 +244,11 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t
config.RegisterHandler(handler2)
endpointsUpdate1 := CreateEndpointsUpdate(ADD, api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []string{"endpoint1", "endpoint2"},
Endpoints: []api.Endpoint{{IP: "endpoint1"}, {IP: "endpoint2"}},
})
endpointsUpdate2 := CreateEndpointsUpdate(ADD, api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "bar"},
Endpoints: []string{"endpoint3", "endpoint4"},
Endpoints: []api.Endpoint{{IP: "endpoint3"}, {IP: "endpoint4"}},
})
handler.Wait(2)
handler2.Wait(2)
@@ -262,7 +262,7 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t
// Add one more
endpointsUpdate3 := CreateEndpointsUpdate(ADD, api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foobar"},
Endpoints: []string{"endpoint5", "endpoint6"},
Endpoints: []api.Endpoint{{IP: "endpoint5"}, {IP: "endpoint6"}},
})
handler.Wait(1)
handler2.Wait(1)
@@ -274,7 +274,7 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t
// Update the "foo" service with new endpoints
endpointsUpdate1 = CreateEndpointsUpdate(ADD, api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []string{"endpoint77"},
Endpoints: []api.Endpoint{{IP: "endpoint7"}},
})
handler.Wait(1)
handler2.Wait(1)

View File

@@ -23,6 +23,7 @@ import (
"net/http"
"net/http/httptest"
"net/url"
"strconv"
"sync/atomic"
"testing"
"time"
@@ -102,8 +103,8 @@ func (fake *fakeIptables) IsIpv6() bool {
return false
}
var tcpServerPort string
var udpServerPort string
var tcpServerPort int
var udpServerPort int
func init() {
// Don't handle panics
@@ -118,20 +119,28 @@ func init() {
if err != nil {
panic(fmt.Sprintf("failed to parse: %v", err))
}
_, tcpServerPort, err = net.SplitHostPort(u.Host)
_, port, err := net.SplitHostPort(u.Host)
if err != nil {
panic(fmt.Sprintf("failed to parse: %v", err))
}
tcpServerPort, err = strconv.Atoi(port)
if err != nil {
panic(fmt.Sprintf("failed to atoi(%s): %v", port, err))
}
// UDP setup.
udp, err := newUDPEchoServer()
if err != nil {
panic(fmt.Sprintf("failed to make a UDP server: %v", err))
}
_, udpServerPort, err = net.SplitHostPort(udp.LocalAddr().String())
_, port, err = net.SplitHostPort(udp.LocalAddr().String())
if err != nil {
panic(fmt.Sprintf("failed to parse: %v", err))
}
udpServerPort, err = strconv.Atoi(port)
if err != nil {
panic(fmt.Sprintf("failed to atoi(%s): %v", port, err))
}
go udp.Loop()
}
@@ -188,7 +197,7 @@ func TestTCPProxy(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
Endpoints: []string{net.JoinHostPort("127.0.0.1", tcpServerPort)},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
},
})
@@ -208,7 +217,7 @@ func TestUDPProxy(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
Endpoints: []string{net.JoinHostPort("127.0.0.1", udpServerPort)},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}},
},
})
@@ -237,7 +246,7 @@ func TestTCPProxyStop(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
Endpoints: []string{net.JoinHostPort("127.0.0.1", tcpServerPort)},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
},
})
@@ -268,7 +277,7 @@ func TestUDPProxyStop(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
Endpoints: []string{net.JoinHostPort("127.0.0.1", udpServerPort)},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}},
},
})
@@ -299,7 +308,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
Endpoints: []string{net.JoinHostPort("127.0.0.1", tcpServerPort)},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
},
})
@@ -329,7 +338,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
Endpoints: []string{net.JoinHostPort("127.0.0.1", udpServerPort)},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}},
},
})
@@ -359,7 +368,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
Endpoints: []string{net.JoinHostPort("127.0.0.1", tcpServerPort)},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
},
})
@@ -398,7 +407,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
Endpoints: []string{net.JoinHostPort("127.0.0.1", udpServerPort)},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}},
},
})
@@ -437,7 +446,7 @@ func TestTCPProxyUpdatePort(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
Endpoints: []string{net.JoinHostPort("127.0.0.1", tcpServerPort)},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
},
})
@@ -473,7 +482,7 @@ func TestUDPProxyUpdatePort(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
Endpoints: []string{net.JoinHostPort("127.0.0.1", udpServerPort)},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}},
},
})

View File

@@ -149,23 +149,18 @@ func (lb *LoadBalancerRR) NextEndpoint(service string, srcAddr net.Addr) (string
return endpoint, nil
}
func isValidEndpoint(spec string) bool {
_, port, err := net.SplitHostPort(spec)
if err != nil {
return false
}
value, err := strconv.Atoi(port)
if err != nil {
return false
}
return value > 0
func isValidEndpoint(ep *api.Endpoint) bool {
return ep.IP != "" && ep.Port > 0
}
func filterValidEndpoints(endpoints []string) []string {
func filterValidEndpoints(endpoints []api.Endpoint) []string {
// Convert Endpoint objects into strings for easier use later. Ignore
// the protocol field - we'll get that from the Service objects.
var result []string
for _, spec := range endpoints {
if isValidEndpoint(spec) {
result = append(result, spec)
for i := range endpoints {
ep := &endpoints[i]
if isValidEndpoint(ep) {
result = append(result, net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port)))
}
}
return result

View File

@@ -24,22 +24,28 @@ import (
)
func TestValidateWorks(t *testing.T) {
if isValidEndpoint("") {
if isValidEndpoint(&api.Endpoint{}) {
t.Errorf("Didn't fail for empty string")
}
if isValidEndpoint("foobar") {
if isValidEndpoint(&api.Endpoint{IP: "foobar"}) {
t.Errorf("Didn't fail with no port")
}
if isValidEndpoint("foobar:-1") {
if isValidEndpoint(&api.Endpoint{IP: "foobar", Port: -1}) {
t.Errorf("Didn't fail with a negative port")
}
if !isValidEndpoint("foobar:8080") {
if !isValidEndpoint(&api.Endpoint{IP: "foobar", Port: 8080}) {
t.Errorf("Failed a valid config.")
}
}
func TestFilterWorks(t *testing.T) {
endpoints := []string{"foobar:1", "foobar:2", "foobar:-1", "foobar:3", "foobar:-2"}
endpoints := []api.Endpoint{
{IP: "foobar", Port: 1},
{IP: "foobar", Port: 2},
{IP: "foobar", Port: -1},
{IP: "foobar", Port: 3},
{IP: "foobar", Port: -2},
}
filtered := filterValidEndpoints(endpoints)
if len(filtered) != 3 {
@@ -88,7 +94,7 @@ func TestLoadBalanceWorksWithSingleEndpoint(t *testing.T) {
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []string{"endpoint1:40"},
Endpoints: []api.Endpoint{{IP: "endpoint1", Port: 40}},
}
loadBalancer.OnUpdate(endpoints)
expectEndpoint(t, loadBalancer, "foo", "endpoint1:40", nil)
@@ -106,7 +112,11 @@ func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) {
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.endpointsMap["foo"]
@@ -125,7 +135,11 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.endpointsMap["foo"]
@@ -137,7 +151,10 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
// Then update the configuration with one fewer endpoints, make sure
// we start in the beginning again
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []string{"endpoint:8", "endpoint:9"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 8},
{IP: "endpoint", Port: 9},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.endpointsMap["foo"]
@@ -146,7 +163,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil)
// Clear endpoints
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []string{}}
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{}}
loadBalancer.OnUpdate(endpoints)
endpoint, err = loadBalancer.NextEndpoint("foo", nil)
@@ -164,11 +181,18 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {
endpoints := make([]api.Endpoints, 2)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
},
}
endpoints[1] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "bar"},
Endpoints: []string{"endpoint:4", "endpoint:5"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 4},
{IP: "endpoint", Port: 5},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledFooEndpoints := loadBalancer.endpointsMap["foo"]
@@ -211,7 +235,7 @@ func TestStickyLoadBalanceWorksWithSingleEndpoint(t *testing.T) {
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []string{"endpoint:1"},
Endpoints: []api.Endpoint{{IP: "endpoint", Port: 1}},
}
loadBalancer.OnUpdate(endpoints)
expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client1)
@@ -234,7 +258,11 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpoints(t *testing.T) {
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.endpointsMap["foo"]
@@ -262,7 +290,11 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsStickyNone(t *testing.T) {
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.endpointsMap["foo"]
@@ -293,7 +325,11 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.endpointsMap["foo"]
@@ -308,7 +344,10 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []string{"endpoint:1", "endpoint:2"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.endpointsMap["foo"]
@@ -325,7 +364,11 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:4"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 4},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.endpointsMap["foo"]
@@ -351,7 +394,11 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.endpointsMap["foo"]
@@ -365,7 +412,10 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
// Then update the configuration with one fewer endpoints, make sure
// we start in the beginning again
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []string{"endpoint:4", "endpoint:5"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 4},
{IP: "endpoint", Port: 5},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.endpointsMap["foo"]
@@ -376,7 +426,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2)
// Clear endpoints
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []string{}}
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{}}
loadBalancer.OnUpdate(endpoints)
endpoint, err = loadBalancer.NextEndpoint("foo", nil)
@@ -398,12 +448,19 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) {
endpoints := make([]api.Endpoints, 2)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
},
}
loadBalancer.NewService("bar", api.AffinityTypeClientIP, 0)
endpoints[1] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "bar"},
Endpoints: []string{"endpoint:4", "endpoint:5"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 5},
{IP: "endpoint", Port: 5},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledFooEndpoints := loadBalancer.endpointsMap["foo"]