diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 08b6eaecb02..417081fbb05 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -538,7 +538,7 @@ func runServiceTest(client *client.Client) { { Name: "c1", Image: "foo", - Ports: []api.Port{ + Ports: []api.ContainerPort{ {ContainerPort: 1234}, }, ImagePullPolicy: "PullIfNotPresent", diff --git a/pkg/api/testing/fuzzer.go b/pkg/api/testing/fuzzer.go index ed4b1a0e15f..6e50d76b879 100644 --- a/pkg/api/testing/fuzzer.go +++ b/pkg/api/testing/fuzzer.go @@ -24,6 +24,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta2" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -237,7 +239,13 @@ func FuzzerFor(t *testing.T, version string, src rand.Source) *fuzz.Fuzzer { func(ep *api.Endpoint, c fuzz.Continue) { // TODO: If our API used a particular type for IP fields we could just catch that here. ep.IP = fmt.Sprintf("%d.%d.%d.%d", c.Rand.Intn(256), c.Rand.Intn(256), c.Rand.Intn(256), c.Rand.Intn(256)) - ep.Port = c.Rand.Intn(65536) + // TODO: Once we drop single-port APIs, make this fuzz + // multiple ports and fuzz port.name. This will force + // a compile error when those APIs are deleted. + _ = v1beta1.Dependency + _ = v1beta2.Dependency + ep.Ports = []api.EndpointPort{{Name: "", Port: c.Rand.Intn(65536)}} + c.Fuzz(&ep.Ports[0].Protocol) }, ) return f diff --git a/pkg/api/types.go b/pkg/api/types.go index fecf1750ed9..5d80556ed5c 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -239,8 +239,8 @@ type SecretSource struct { Target ObjectReference `json:"target"` } -// Port represents a network port in a single container -type Port struct { +// ContainerPort represents a network port in a single container +type ContainerPort struct { // Optional: If specified, this must be a DNS_LABEL. Each named port // in a pod must have a unique name. Name string `json:"name,omitempty"` @@ -346,9 +346,9 @@ type Container struct { // Optional: Defaults to whatever is defined in the image. Command []string `json:"command,omitempty"` // Optional: Defaults to Docker's default. - WorkingDir string `json:"workingDir,omitempty"` - Ports []Port `json:"ports,omitempty"` - Env []EnvVar `json:"env,omitempty"` + WorkingDir string `json:"workingDir,omitempty"` + Ports []ContainerPort `json:"ports,omitempty"` + Env []EnvVar `json:"env,omitempty"` // Compute resource requirements. Resources ResourceRequirements `json:"resources,omitempty"` VolumeMounts []VolumeMount `json:"volumeMounts,omitempty"` @@ -750,9 +750,6 @@ type Endpoints struct { TypeMeta `json:",inline"` ObjectMeta `json:"metadata,omitempty"` - // Optional: The IP protocol for these endpoints. Supports "TCP" and - // "UDP". Defaults to "TCP". - Protocol Protocol `json:"protocol,omitempty"` Endpoints []Endpoint `json:"endpoints,omitempty"` } @@ -762,8 +759,21 @@ type Endpoint struct { // TODO: This should allow hostname or IP, see #4447. IP string `json:"ip"` - // Required: The destination port to access. - Port int `json:"port"` + // The ports exposed on this IP. + Ports []EndpointPort +} + +type EndpointPort struct { + // Optional if only one port is defined in this Endpoint. + // The name of this port within the larger service/endpoint structure. + // This must be a DNS_LABEL. + Name string + + // The IP protocol for this port. Supports "TCP" and "UDP". + Protocol Protocol + + // The destination port to access. + Port int } // EndpointsList is a list of endpoints. diff --git a/pkg/api/v1beta1/conversion.go b/pkg/api/v1beta1/conversion.go index 6ba485bdbe9..eaa787d783a 100644 --- a/pkg/api/v1beta1/conversion.go +++ b/pkg/api/v1beta1/conversion.go @@ -1123,12 +1123,16 @@ func init() { if err := s.Convert(&in.ObjectMeta, &out.TypeMeta, 0); err != nil { return err } - if err := s.Convert(&in.Protocol, &out.Protocol, 0); err != nil { - return err - } for i := range in.Endpoints { ep := &in.Endpoints[i] - out.Endpoints = append(out.Endpoints, net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port))) + // newer.Endpoints.Endpoints[i].Ports is an array - take the first one. + if len(ep.Ports) > 0 { + port := &ep.Ports[0] + if err := s.Convert(&port.Protocol, &out.Protocol, 0); err != nil { + return err + } + out.Endpoints = append(out.Endpoints, net.JoinHostPort(ep.IP, strconv.Itoa(port.Port))) + } } return nil }, @@ -1139,22 +1143,20 @@ func init() { if err := s.Convert(&in.TypeMeta, &out.ObjectMeta, 0); err != nil { return err } - if err := s.Convert(&in.Protocol, &out.Protocol, 0); err != nil { - return err - } for i := range in.Endpoints { - out.Endpoints = append(out.Endpoints, newer.Endpoint{}) - ep := &out.Endpoints[i] host, port, err := net.SplitHostPort(in.Endpoints[i]) if err != nil { return err } - ep.IP = host pn, err := strconv.Atoi(port) if err != nil { return err } - ep.Port = pn + epp := newer.EndpointPort{Port: pn} + if err := s.Convert(&in.Protocol, &epp.Protocol, 0); err != nil { + return err + } + out.Endpoints = append(out.Endpoints, newer.Endpoint{IP: host, Ports: []newer.EndpointPort{epp}}) } return nil }, diff --git a/pkg/api/v1beta1/conversion_test.go b/pkg/api/v1beta1/conversion_test.go index df0009b4198..f5507ce6065 100644 --- a/pkg/api/v1beta1/conversion_test.go +++ b/pkg/api/v1beta1/conversion_test.go @@ -390,41 +390,35 @@ func TestEndpointsConversion(t *testing.T) { }{ { given: current.Endpoints{ - TypeMeta: current.TypeMeta{ - ID: "empty", - }, - Protocol: current.ProtocolTCP, + Protocol: "", Endpoints: []string{}, }, expected: newer.Endpoints{ - Protocol: newer.ProtocolTCP, Endpoints: []newer.Endpoint{}, }, }, { given: current.Endpoints{ - TypeMeta: current.TypeMeta{ - ID: "one", - }, Protocol: current.ProtocolTCP, Endpoints: []string{"1.2.3.4:88"}, }, expected: newer.Endpoints{ - Protocol: newer.ProtocolTCP, - Endpoints: []newer.Endpoint{{IP: "1.2.3.4", Port: 88}}, + Endpoints: []newer.Endpoint{ + {IP: "1.2.3.4", Ports: []newer.EndpointPort{{Protocol: newer.ProtocolTCP, Port: 88}}}, + }, }, }, { given: current.Endpoints{ - TypeMeta: current.TypeMeta{ - ID: "several", - }, Protocol: current.ProtocolUDP, Endpoints: []string{"1.2.3.4:88", "1.2.3.4:89", "1.2.3.4:90"}, }, expected: newer.Endpoints{ - Protocol: newer.ProtocolUDP, - Endpoints: []newer.Endpoint{{IP: "1.2.3.4", Port: 88}, {IP: "1.2.3.4", Port: 89}, {IP: "1.2.3.4", Port: 90}}, + Endpoints: []newer.Endpoint{ + {IP: "1.2.3.4", Ports: []newer.EndpointPort{{Protocol: newer.ProtocolUDP, Port: 88}}}, + {IP: "1.2.3.4", Ports: []newer.EndpointPort{{Protocol: newer.ProtocolUDP, Port: 89}}}, + {IP: "1.2.3.4", Ports: []newer.EndpointPort{{Protocol: newer.ProtocolUDP, Port: 90}}}, + }, }, }, } @@ -436,7 +430,7 @@ func TestEndpointsConversion(t *testing.T) { t.Errorf("[Case: %d] Unexpected error: %v", i, err) continue } - if got.Protocol != tc.expected.Protocol || !newer.Semantic.DeepEqual(got.Endpoints, tc.expected.Endpoints) { + if !newer.Semantic.DeepEqual(got.Endpoints, tc.expected.Endpoints) { t.Errorf("[Case: %d] Expected %v, got %v", i, tc.expected, got) } diff --git a/pkg/api/v1beta1/defaults.go b/pkg/api/v1beta1/defaults.go index c833f9ffb60..e71db4f6ffa 100644 --- a/pkg/api/v1beta1/defaults.go +++ b/pkg/api/v1beta1/defaults.go @@ -32,7 +32,7 @@ func init() { } } }, - func(obj *Port) { + func(obj *ContainerPort) { if obj.Protocol == "" { obj.Protocol = ProtocolTCP } @@ -86,7 +86,7 @@ func init() { } }, func(obj *Endpoints) { - if obj.Protocol == "" { + if obj.Protocol == "" && len(obj.Endpoints) > 0 { obj.Protocol = "TCP" } }, diff --git a/pkg/api/v1beta1/defaults_test.go b/pkg/api/v1beta1/defaults_test.go index d7372045247..2127ee0f647 100644 --- a/pkg/api/v1beta1/defaults_test.go +++ b/pkg/api/v1beta1/defaults_test.go @@ -73,7 +73,7 @@ func TestSetDefaulPodSpec(t *testing.T) { func TestSetDefaultContainer(t *testing.T) { bp := ¤t.BoundPod{} bp.Spec.Containers = []current.Container{{}} - bp.Spec.Containers[0].Ports = []current.Port{{}} + bp.Spec.Containers[0].Ports = []current.ContainerPort{{}} obj2 := roundTrip(t, runtime.Object(bp)) bp2 := obj2.(*current.BoundPod) @@ -103,11 +103,21 @@ func TestSetDefaultSecret(t *testing.T) { } } -func TestSetDefaulEndpointsProtocol(t *testing.T) { +func TestSetDefaulEndpointsProtocolEmpty(t *testing.T) { in := ¤t.Endpoints{} obj := roundTrip(t, runtime.Object(in)) out := obj.(*current.Endpoints) + if out.Protocol != "" { + t.Errorf("Expected protocol \"\", got %s", out.Protocol) + } +} + +func TestSetDefaulEndpointsProtocol(t *testing.T) { + in := ¤t.Endpoints{Endpoints: []string{"1.2.3.4:5678"}} + obj := roundTrip(t, runtime.Object(in)) + out := obj.(*current.Endpoints) + if out.Protocol != current.ProtocolTCP { t.Errorf("Expected protocol %s, got %s", current.ProtocolTCP, out.Protocol) } diff --git a/pkg/api/v1beta1/register.go b/pkg/api/v1beta1/register.go index f61f806f08a..d98c038230c 100644 --- a/pkg/api/v1beta1/register.go +++ b/pkg/api/v1beta1/register.go @@ -24,6 +24,12 @@ import ( // Codec encodes internal objects to the v1beta1 scheme var Codec = runtime.CodecFor(api.Scheme, "v1beta1") +// Dependency does nothing but give a hook for other packages to force a +// compile-time error when this API version is eventually removed. This is +// useful, for example, to clean up things that are implicitly tied to +// semantics of older APIs. +const Dependency = true + func init() { api.Scheme.AddKnownTypes("v1beta1", &Pod{}, diff --git a/pkg/api/v1beta1/types.go b/pkg/api/v1beta1/types.go index c38960d925c..53b6051d2c7 100644 --- a/pkg/api/v1beta1/types.go +++ b/pkg/api/v1beta1/types.go @@ -161,8 +161,8 @@ type SecretSource struct { Target ObjectReference `json:"target" description:"target is a reference to a secret"` } -// Port represents a network port in a single container -type Port struct { +// ContainerPort represents a network port in a single container +type ContainerPort struct { // Optional: If specified, this must be a DNS_LABEL. Each named port // in a pod must have a unique name. Name string `json:"name,omitempty" description:"name for the port that can be referred to by services; must be a DNS_LABEL and unique without the pod"` @@ -283,7 +283,7 @@ type Container struct { Command []string `json:"command,omitempty" description:"command argv array; not executed within a shell; defaults to entrypoint or command in the image"` // Optional: Defaults to Docker's default. WorkingDir string `json:"workingDir,omitempty" description:"container's working directory; defaults to image's default"` - Ports []Port `json:"ports,omitempty" description:"list of ports to expose from the container"` + Ports []ContainerPort `json:"ports,omitempty" description:"list of ports to expose from the container"` Env []EnvVar `json:"env,omitempty" description:"list of environment variables to set in the container"` Resources ResourceRequirements `json:"resources,omitempty" description:"Compute Resources required by this container"` // Optional: Defaults to unlimited. diff --git a/pkg/api/v1beta2/conversion.go b/pkg/api/v1beta2/conversion.go index 837b9366a90..44371412c91 100644 --- a/pkg/api/v1beta2/conversion.go +++ b/pkg/api/v1beta2/conversion.go @@ -1038,12 +1038,16 @@ func init() { if err := s.Convert(&in.ObjectMeta, &out.TypeMeta, 0); err != nil { return err } - if err := s.Convert(&in.Protocol, &out.Protocol, 0); err != nil { - return err - } for i := range in.Endpoints { ep := &in.Endpoints[i] - out.Endpoints = append(out.Endpoints, net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port))) + // newer.Endpoints.Endpoints[i].Ports is an array - take the first one. + if len(ep.Ports) > 0 { + port := &ep.Ports[0] + if err := s.Convert(&port.Protocol, &out.Protocol, 0); err != nil { + return err + } + out.Endpoints = append(out.Endpoints, net.JoinHostPort(ep.IP, strconv.Itoa(port.Port))) + } } return nil }, @@ -1054,22 +1058,20 @@ func init() { if err := s.Convert(&in.TypeMeta, &out.ObjectMeta, 0); err != nil { return err } - if err := s.Convert(&in.Protocol, &out.Protocol, 0); err != nil { - return err - } for i := range in.Endpoints { - out.Endpoints = append(out.Endpoints, newer.Endpoint{}) - ep := &out.Endpoints[i] host, port, err := net.SplitHostPort(in.Endpoints[i]) if err != nil { return err } - ep.IP = host pn, err := strconv.Atoi(port) if err != nil { return err } - ep.Port = pn + epp := newer.EndpointPort{Port: pn} + if err := s.Convert(&in.Protocol, &epp.Protocol, 0); err != nil { + return err + } + out.Endpoints = append(out.Endpoints, newer.Endpoint{IP: host, Ports: []newer.EndpointPort{epp}}) } return nil }, diff --git a/pkg/api/v1beta2/conversion_test.go b/pkg/api/v1beta2/conversion_test.go index b606476466d..ac88b233d40 100644 --- a/pkg/api/v1beta2/conversion_test.go +++ b/pkg/api/v1beta2/conversion_test.go @@ -220,41 +220,35 @@ func TestEndpointsConversion(t *testing.T) { }{ { given: current.Endpoints{ - TypeMeta: current.TypeMeta{ - ID: "empty", - }, - Protocol: current.ProtocolTCP, + Protocol: "", Endpoints: []string{}, }, expected: newer.Endpoints{ - Protocol: newer.ProtocolTCP, Endpoints: []newer.Endpoint{}, }, }, { given: current.Endpoints{ - TypeMeta: current.TypeMeta{ - ID: "one", - }, Protocol: current.ProtocolTCP, Endpoints: []string{"1.2.3.4:88"}, }, expected: newer.Endpoints{ - Protocol: newer.ProtocolTCP, - Endpoints: []newer.Endpoint{{IP: "1.2.3.4", Port: 88}}, + Endpoints: []newer.Endpoint{ + {IP: "1.2.3.4", Ports: []newer.EndpointPort{{Protocol: newer.ProtocolTCP, Port: 88}}}, + }, }, }, { given: current.Endpoints{ - TypeMeta: current.TypeMeta{ - ID: "several", - }, Protocol: current.ProtocolUDP, Endpoints: []string{"1.2.3.4:88", "1.2.3.4:89", "1.2.3.4:90"}, }, expected: newer.Endpoints{ - Protocol: newer.ProtocolUDP, - Endpoints: []newer.Endpoint{{IP: "1.2.3.4", Port: 88}, {IP: "1.2.3.4", Port: 89}, {IP: "1.2.3.4", Port: 90}}, + Endpoints: []newer.Endpoint{ + {IP: "1.2.3.4", Ports: []newer.EndpointPort{{Protocol: newer.ProtocolUDP, Port: 88}}}, + {IP: "1.2.3.4", Ports: []newer.EndpointPort{{Protocol: newer.ProtocolUDP, Port: 89}}}, + {IP: "1.2.3.4", Ports: []newer.EndpointPort{{Protocol: newer.ProtocolUDP, Port: 90}}}, + }, }, }, } @@ -266,7 +260,7 @@ func TestEndpointsConversion(t *testing.T) { t.Errorf("[Case: %d] Unexpected error: %v", i, err) continue } - if got.Protocol != tc.expected.Protocol || !newer.Semantic.DeepEqual(got.Endpoints, tc.expected.Endpoints) { + if !newer.Semantic.DeepEqual(got.Endpoints, tc.expected.Endpoints) { t.Errorf("[Case: %d] Expected %v, got %v", i, tc.expected, got) } diff --git a/pkg/api/v1beta2/defaults.go b/pkg/api/v1beta2/defaults.go index 9242bf3a323..c5bbdebf830 100644 --- a/pkg/api/v1beta2/defaults.go +++ b/pkg/api/v1beta2/defaults.go @@ -34,7 +34,7 @@ func init() { } } }, - func(obj *Port) { + func(obj *ContainerPort) { if obj.Protocol == "" { obj.Protocol = ProtocolTCP } @@ -88,7 +88,7 @@ func init() { } }, func(obj *Endpoints) { - if obj.Protocol == "" { + if obj.Protocol == "" && len(obj.Endpoints) > 0 { obj.Protocol = "TCP" } }, diff --git a/pkg/api/v1beta2/defaults_test.go b/pkg/api/v1beta2/defaults_test.go index a73c83a93e6..1141f53d343 100644 --- a/pkg/api/v1beta2/defaults_test.go +++ b/pkg/api/v1beta2/defaults_test.go @@ -73,7 +73,7 @@ func TestSetDefaulPodSpec(t *testing.T) { func TestSetDefaultContainer(t *testing.T) { bp := ¤t.BoundPod{} bp.Spec.Containers = []current.Container{{}} - bp.Spec.Containers[0].Ports = []current.Port{{}} + bp.Spec.Containers[0].Ports = []current.ContainerPort{{}} obj2 := roundTrip(t, runtime.Object(bp)) bp2 := obj2.(*current.BoundPod) @@ -103,11 +103,21 @@ func TestSetDefaultSecret(t *testing.T) { } } -func TestSetDefaulEndpointsProtocol(t *testing.T) { +func TestSetDefaulEndpointsProtocolEmpty(t *testing.T) { in := ¤t.Endpoints{} obj := roundTrip(t, runtime.Object(in)) out := obj.(*current.Endpoints) + if out.Protocol != "" { + t.Errorf("Expected protocol \"\", got %s", out.Protocol) + } +} + +func TestSetDefaulEndpointsProtocol(t *testing.T) { + in := ¤t.Endpoints{Endpoints: []string{"1.2.3.4:5678"}} + obj := roundTrip(t, runtime.Object(in)) + out := obj.(*current.Endpoints) + if out.Protocol != current.ProtocolTCP { t.Errorf("Expected protocol %s, got %s", current.ProtocolTCP, out.Protocol) } diff --git a/pkg/api/v1beta2/register.go b/pkg/api/v1beta2/register.go index 990aa7b2039..39612227928 100644 --- a/pkg/api/v1beta2/register.go +++ b/pkg/api/v1beta2/register.go @@ -24,6 +24,12 @@ import ( // Codec encodes internal objects to the v1beta2 scheme var Codec = runtime.CodecFor(api.Scheme, "v1beta2") +// Dependency does nothing but give a hook for other packages to force a +// compile-time error when this API version is eventually removed. This is +// useful, for example, to clean up things that are implicitly tied to +// semantics of older APIs. +const Dependency = true + func init() { api.Scheme.AddKnownTypes("v1beta2", &Pod{}, diff --git a/pkg/api/v1beta2/types.go b/pkg/api/v1beta2/types.go index c3518bf7573..d5d5957dc62 100644 --- a/pkg/api/v1beta2/types.go +++ b/pkg/api/v1beta2/types.go @@ -99,8 +99,8 @@ const ( ProtocolUDP Protocol = "UDP" ) -// Port represents a network port in a single container. -type Port struct { +// ContainerPort represents a network port in a single container. +type ContainerPort struct { // Optional: If specified, this must be a DNS_LABEL. Each named port // in a pod must have a unique name. Name string `json:"name,omitempty" description:"name for the port that can be referred to by services; must be a DNS_LABEL and unique without the pod"` @@ -242,7 +242,7 @@ type Container struct { Command []string `json:"command,omitempty" description:"command argv array; not executed within a shell; defaults to entrypoint or command in the image"` // Optional: Defaults to Docker's default. WorkingDir string `json:"workingDir,omitempty" description:"container's working directory; defaults to image's default"` - Ports []Port `json:"ports,omitempty" description:"list of ports to expose from the container"` + Ports []ContainerPort `json:"ports,omitempty" description:"list of ports to expose from the container"` Env []EnvVar `json:"env,omitempty" description:"list of environment variables to set in the container"` Resources ResourceRequirements `json:"resources,omitempty" description:"Compute Resources required by this container"` // Optional: Defaults to unlimited. diff --git a/pkg/api/v1beta3/defaults.go b/pkg/api/v1beta3/defaults.go index 3149601fb64..03fe748706a 100644 --- a/pkg/api/v1beta3/defaults.go +++ b/pkg/api/v1beta3/defaults.go @@ -32,7 +32,7 @@ func init() { } } }, - func(obj *Port) { + func(obj *ContainerPort) { if obj.Protocol == "" { obj.Protocol = ProtocolTCP } @@ -81,8 +81,14 @@ func init() { } }, func(obj *Endpoints) { - if obj.Protocol == "" { - obj.Protocol = "TCP" + for i := range obj.Endpoints { + ep := &obj.Endpoints[i] + for j := range ep.Ports { + port := &ep.Ports[j] + if port.Protocol == "" { + port.Protocol = ProtocolTCP + } + } } }, ) diff --git a/pkg/api/v1beta3/defaults_test.go b/pkg/api/v1beta3/defaults_test.go index e1ebf388b1e..2ac72ae3360 100644 --- a/pkg/api/v1beta3/defaults_test.go +++ b/pkg/api/v1beta3/defaults_test.go @@ -73,7 +73,7 @@ func TestSetDefaulPodSpec(t *testing.T) { func TestSetDefaultContainer(t *testing.T) { bp := ¤t.BoundPod{} bp.Spec.Containers = []current.Container{{}} - bp.Spec.Containers[0].Ports = []current.Port{{}} + bp.Spec.Containers[0].Ports = []current.ContainerPort{{}} obj2 := roundTrip(t, runtime.Object(bp)) bp2 := obj2.(*current.BoundPod) @@ -104,11 +104,25 @@ func TestSetDefaultSecret(t *testing.T) { } func TestSetDefaulEndpointsProtocol(t *testing.T) { - in := ¤t.Endpoints{} + in := ¤t.Endpoints{ + Endpoints: []current.Endpoint{ + {IP: "1.2.3.4", Ports: []current.EndpointPort{ + {Protocol: "TCP"}, + {Protocol: "UDP"}, + {Protocol: ""}, + }}, + }, + } obj := roundTrip(t, runtime.Object(in)) out := obj.(*current.Endpoints) - if out.Protocol != current.ProtocolTCP { - t.Errorf("Expected protocol %s, got %s", current.ProtocolTCP, out.Protocol) + if out.Endpoints[0].Ports[0].Protocol != current.ProtocolTCP { + t.Errorf("Expected protocol[0] %s, got %s", current.ProtocolTCP, out.Endpoints[0].Ports[0].Protocol) + } + if out.Endpoints[0].Ports[1].Protocol != current.ProtocolUDP { + t.Errorf("Expected protocol[1] %s, got %s", current.ProtocolTCP, out.Endpoints[0].Ports[1].Protocol) + } + if out.Endpoints[0].Ports[2].Protocol != current.ProtocolTCP { + t.Errorf("Expected protocol[2] %s, got %s", current.ProtocolTCP, out.Endpoints[0].Ports[2].Protocol) } } diff --git a/pkg/api/v1beta3/types.go b/pkg/api/v1beta3/types.go index ca6c22d82bf..68640eb68e4 100644 --- a/pkg/api/v1beta3/types.go +++ b/pkg/api/v1beta3/types.go @@ -254,8 +254,8 @@ type SecretSource struct { Target ObjectReference `json:"target" description:"target is a reference to a secret"` } -// Port represents a network port in a single container. -type Port struct { +// ContainerPort represents a network port in a single container. +type ContainerPort struct { // Optional: If specified, this must be a DNS_LABEL. Each named port // in a pod must have a unique name. Name string `json:"name,omitempty"` @@ -367,7 +367,7 @@ type Container struct { Command []string `json:"command,omitempty"` // Optional: Defaults to Docker's default. WorkingDir string `json:"workingDir,omitempty"` - Ports []Port `json:"ports,omitempty"` + Ports []ContainerPort `json:"ports,omitempty"` Env []EnvVar `json:"env,omitempty"` Resources ResourceRequirements `json:"resources,omitempty" description:"Compute Resources required by this container"` VolumeMounts []VolumeMount `json:"volumeMounts,omitempty"` @@ -782,10 +782,6 @@ type Endpoints struct { TypeMeta `json:",inline"` ObjectMeta `json:"metadata,omitempty"` - // Optional: The IP protocol for these endpoints. Supports "TCP" and - // "UDP". Defaults to "TCP". - Protocol Protocol `json:"protocol,omitempty"` - Endpoints []Endpoint `json:"endpoints,omitempty"` } @@ -795,6 +791,20 @@ type Endpoint struct { // TODO: This should allow hostname or IP, see #4447. IP string `json:"ip"` + // The ports exposed on this IP. + Ports []EndpointPort `json:"ports,omitempty"` +} + +type EndpointPort struct { + // Optional if only one port is defined in this Endpoint, otherwise required. + // The name of this port within the larger service/endpoint structure. + // This must be a DNS_LABEL. + Name string `json:"name,omitempty"` + + // Optional: The IP protocol for this port. Supports "TCP" and "UDP". + // Defaults to "TCP". + Protocol Protocol `json:"protocol,omitempty"` + // Required: The destination port to access. Port int `json:"port"` } diff --git a/pkg/api/validation/validation.go b/pkg/api/validation/validation.go index 619d6cde9bf..62bc52b66fc 100644 --- a/pkg/api/validation/validation.go +++ b/pkg/api/validation/validation.go @@ -303,7 +303,7 @@ func validateSecretSource(secretSource *api.SecretSource) errs.ValidationErrorLi var supportedPortProtocols = util.NewStringSet(string(api.ProtocolTCP), string(api.ProtocolUDP)) -func validatePorts(ports []api.Port) errs.ValidationErrorList { +func validatePorts(ports []api.ContainerPort) errs.ValidationErrorList { allErrs := errs.ValidationErrorList{} allNames := util.StringSet{} @@ -388,7 +388,7 @@ func validateProbe(probe *api.Probe) errs.ValidationErrorList { // AccumulateUniquePorts runs an extraction function on each Port of each Container, // accumulating the results and returning an error if any ports conflict. -func AccumulateUniquePorts(containers []api.Container, accumulator map[int]bool, extract func(*api.Port) int) errs.ValidationErrorList { +func AccumulateUniquePorts(containers []api.Container, accumulator map[int]bool, extract func(*api.ContainerPort) int) errs.ValidationErrorList { allErrs := errs.ValidationErrorList{} for ci, ctr := range containers { @@ -413,7 +413,7 @@ func AccumulateUniquePorts(containers []api.Container, accumulator map[int]bool, // a slice of containers. func checkHostPortConflicts(containers []api.Container) errs.ValidationErrorList { allPorts := map[int]bool{} - return AccumulateUniquePorts(containers, allPorts, func(p *api.Port) int { return p.HostPort }) + return AccumulateUniquePorts(containers, allPorts, func(p *api.ContainerPort) int { return p.HostPort }) } func validateExecAction(exec *api.ExecAction) errs.ValidationErrorList { diff --git a/pkg/api/validation/validation_test.go b/pkg/api/validation/validation_test.go index 9e7e3c7647c..bcba1458e8b 100644 --- a/pkg/api/validation/validation_test.go +++ b/pkg/api/validation/validation_test.go @@ -195,7 +195,7 @@ func TestValidateVolumes(t *testing.T) { } func TestValidatePorts(t *testing.T) { - successCase := []api.Port{ + successCase := []api.ContainerPort{ {Name: "abc", ContainerPort: 80, HostPort: 80, Protocol: "TCP"}, {Name: "easy", ContainerPort: 82, Protocol: "TCP"}, {Name: "as", ContainerPort: 83, Protocol: "UDP"}, @@ -207,7 +207,7 @@ func TestValidatePorts(t *testing.T) { t.Errorf("expected success: %v", errs) } - nonCanonicalCase := []api.Port{ + nonCanonicalCase := []api.ContainerPort{ {ContainerPort: 80, Protocol: "TCP"}, } if errs := validatePorts(nonCanonicalCase); len(errs) != 0 { @@ -215,22 +215,22 @@ func TestValidatePorts(t *testing.T) { } errorCases := map[string]struct { - P []api.Port + P []api.ContainerPort T errors.ValidationErrorType F string D string }{ - "name > 63 characters": {[]api.Port{{Name: strings.Repeat("a", 64), ContainerPort: 80, Protocol: "TCP"}}, errors.ValidationErrorTypeInvalid, "[0].name", dnsLabelErrorMsg}, - "name not a DNS label": {[]api.Port{{Name: "a.b.c", ContainerPort: 80, Protocol: "TCP"}}, errors.ValidationErrorTypeInvalid, "[0].name", dnsLabelErrorMsg}, - "name not unique": {[]api.Port{ + "name > 63 characters": {[]api.ContainerPort{{Name: strings.Repeat("a", 64), ContainerPort: 80, Protocol: "TCP"}}, errors.ValidationErrorTypeInvalid, "[0].name", dnsLabelErrorMsg}, + "name not a DNS label": {[]api.ContainerPort{{Name: "a.b.c", ContainerPort: 80, Protocol: "TCP"}}, errors.ValidationErrorTypeInvalid, "[0].name", dnsLabelErrorMsg}, + "name not unique": {[]api.ContainerPort{ {Name: "abc", ContainerPort: 80, Protocol: "TCP"}, {Name: "abc", ContainerPort: 81, Protocol: "TCP"}, }, errors.ValidationErrorTypeDuplicate, "[1].name", ""}, - "zero container port": {[]api.Port{{ContainerPort: 0, Protocol: "TCP"}}, errors.ValidationErrorTypeInvalid, "[0].containerPort", portRangeErrorMsg}, - "invalid container port": {[]api.Port{{ContainerPort: 65536, Protocol: "TCP"}}, errors.ValidationErrorTypeInvalid, "[0].containerPort", portRangeErrorMsg}, - "invalid host port": {[]api.Port{{ContainerPort: 80, HostPort: 65536, Protocol: "TCP"}}, errors.ValidationErrorTypeInvalid, "[0].hostPort", portRangeErrorMsg}, - "invalid protocol": {[]api.Port{{ContainerPort: 80, Protocol: "ICMP"}}, errors.ValidationErrorTypeNotSupported, "[0].protocol", ""}, - "protocol required": {[]api.Port{{Name: "abc", ContainerPort: 80}}, errors.ValidationErrorTypeRequired, "[0].protocol", ""}, + "zero container port": {[]api.ContainerPort{{ContainerPort: 0, Protocol: "TCP"}}, errors.ValidationErrorTypeInvalid, "[0].containerPort", portRangeErrorMsg}, + "invalid container port": {[]api.ContainerPort{{ContainerPort: 65536, Protocol: "TCP"}}, errors.ValidationErrorTypeInvalid, "[0].containerPort", portRangeErrorMsg}, + "invalid host port": {[]api.ContainerPort{{ContainerPort: 80, HostPort: 65536, Protocol: "TCP"}}, errors.ValidationErrorTypeInvalid, "[0].hostPort", portRangeErrorMsg}, + "invalid protocol": {[]api.ContainerPort{{ContainerPort: 80, Protocol: "ICMP"}}, errors.ValidationErrorTypeNotSupported, "[0].protocol", ""}, + "protocol required": {[]api.ContainerPort{{Name: "abc", ContainerPort: 80}}, errors.ValidationErrorTypeRequired, "[0].protocol", ""}, } for k, v := range errorCases { errs := validatePorts(v.P) @@ -433,9 +433,9 @@ func TestValidateContainers(t *testing.T) { }, "zero-length image": {{Name: "abc", Image: "", ImagePullPolicy: "IfNotPresent"}}, "host port not unique": { - {Name: "abc", Image: "image", Ports: []api.Port{{ContainerPort: 80, HostPort: 80, Protocol: "TCP"}}, + {Name: "abc", Image: "image", Ports: []api.ContainerPort{{ContainerPort: 80, HostPort: 80, Protocol: "TCP"}}, ImagePullPolicy: "IfNotPresent"}, - {Name: "def", Image: "image", Ports: []api.Port{{ContainerPort: 81, HostPort: 80, Protocol: "TCP"}}, + {Name: "def", Image: "image", Ports: []api.ContainerPort{{ContainerPort: 81, HostPort: 80, Protocol: "TCP"}}, ImagePullPolicy: "IfNotPresent"}, }, "invalid env var name": { @@ -587,7 +587,7 @@ func TestValidateManifest(t *testing.T) { "memory": resource.MustParse("1"), }, }, - Ports: []api.Port{ + Ports: []api.ContainerPort{ {Name: "p1", ContainerPort: 80, HostPort: 8080, Protocol: "TCP"}, {Name: "p2", ContainerPort: 81, Protocol: "TCP"}, {ContainerPort: 82, Protocol: "TCP"}, @@ -934,7 +934,7 @@ func TestValidatePodUpdate(t *testing.T) { Containers: []api.Container{ { Image: "foo:V1", - Ports: []api.Port{ + Ports: []api.ContainerPort{ {HostPort: 8080, ContainerPort: 80}, }, }, @@ -947,7 +947,7 @@ func TestValidatePodUpdate(t *testing.T) { Containers: []api.Container{ { Image: "foo:V2", - Ports: []api.Port{ + Ports: []api.ContainerPort{ {HostPort: 8000, ContainerPort: 80}, }, }, diff --git a/pkg/client/client_test.go b/pkg/client/client_test.go index 096b887dbcf..e3dc1d213a5 100644 --- a/pkg/client/client_test.go +++ b/pkg/client/client_test.go @@ -616,7 +616,9 @@ func TestListEndpooints(t *testing.T) { { ObjectMeta: api.ObjectMeta{Name: "endpoint-1"}, Endpoints: []api.Endpoint{ - {IP: "10.245.1.2", Port: 8080}, {IP: "10.245.1.3", Port: 8080}}, + {IP: "10.245.1.2", Ports: []api.EndpointPort{{Port: 8080}}}, + {IP: "10.245.1.3", Ports: []api.EndpointPort{{Port: 8080}}}, + }, }, }, }, diff --git a/pkg/constraint/constraint_test.go b/pkg/constraint/constraint_test.go index eae45221270..7f76ddf196b 100644 --- a/pkg/constraint/constraint_test.go +++ b/pkg/constraint/constraint_test.go @@ -26,7 +26,7 @@ import ( func containerWithHostPorts(ports ...int) api.Container { c := api.Container{} for _, p := range ports { - c.Ports = append(c.Ports, api.Port{HostPort: p}) + c.Ports = append(c.Ports, api.ContainerPort{HostPort: p}) } return c } diff --git a/pkg/kubectl/resource_printer.go b/pkg/kubectl/resource_printer.go index f3cf2714a6d..05b7024fa85 100644 --- a/pkg/kubectl/resource_printer.go +++ b/pkg/kubectl/resource_printer.go @@ -22,10 +22,8 @@ import ( "fmt" "io" "io/ioutil" - "net" "reflect" "sort" - "strconv" "strings" "text/tabwriter" "text/template" @@ -270,10 +268,11 @@ func formatEndpoints(endpoints []api.Endpoint) string { return "" } list := []string{} - for i := range endpoints { - ep := &endpoints[i] - list = append(list, net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port))) - } + //FIXME: What do we want to print, now that endpoints are more complex? + //for i := range endpoints { + // ep := &endpoints[i] + // list = append(list, net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port))) + //} return strings.Join(list, ",") } diff --git a/pkg/kubectl/resource_printer_test.go b/pkg/kubectl/resource_printer_test.go index cc6f7ab6b6b..fff9bf09ffd 100644 --- a/pkg/kubectl/resource_printer_test.go +++ b/pkg/kubectl/resource_printer_test.go @@ -452,7 +452,10 @@ func TestPrinters(t *testing.T) { "pod": &api.Pod{ObjectMeta: om("pod")}, "emptyPodList": &api.PodList{}, "nonEmptyPodList": &api.PodList{Items: []api.Pod{{}}}, - "endpoints": &api.Endpoints{Endpoints: []api.Endpoint{{IP: "127.0.0.1"}, {IP: "localhost", Port: 8080}}}, + "endpoints": &api.Endpoints{Endpoints: []api.Endpoint{ + {IP: "127.0.0.1"}, + {IP: "localhost", Ports: []api.EndpointPort{{Port: 8080}}}, + }}, } // map of printer name to set of objects it should fail on. expectedErrors := map[string]util.StringSet{ diff --git a/pkg/kubectl/run.go b/pkg/kubectl/run.go index 3b8e69e1cf2..4e6933be39f 100644 --- a/pkg/kubectl/run.go +++ b/pkg/kubectl/run.go @@ -82,7 +82,7 @@ func (BasicReplicationController) Generate(params map[string]string) (runtime.Ob // Don't include the port if it was not specified. if port > 0 { - controller.Spec.Template.Spec.Containers[0].Ports = []api.Port{ + controller.Spec.Template.Spec.Containers[0].Ports = []api.ContainerPort{ { ContainerPort: port, }, diff --git a/pkg/kubectl/run_test.go b/pkg/kubectl/run_test.go index f009d81c4ea..760b74ca707 100644 --- a/pkg/kubectl/run_test.go +++ b/pkg/kubectl/run_test.go @@ -84,7 +84,7 @@ func TestGenerate(t *testing.T) { { Name: "foo", Image: "someimage", - Ports: []api.Port{ + Ports: []api.ContainerPort{ { ContainerPort: 80, }, diff --git a/pkg/kubelet/handlers_test.go b/pkg/kubelet/handlers_test.go index a211f84a5a3..8f4525a4e7c 100644 --- a/pkg/kubelet/handlers_test.go +++ b/pkg/kubelet/handlers_test.go @@ -38,7 +38,7 @@ func TestResolvePortString(t *testing.T) { expected := 80 name := "foo" container := &api.Container{ - Ports: []api.Port{ + Ports: []api.ContainerPort{ {Name: name, ContainerPort: expected}, }, } @@ -55,7 +55,7 @@ func TestResolvePortStringUnknown(t *testing.T) { expected := 80 name := "foo" container := &api.Container{ - Ports: []api.Port{ + Ports: []api.ContainerPort{ {Name: "bar", ContainerPort: expected}, }, } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index a7fcc45a956..70df616a16d 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -917,7 +917,7 @@ const ( // createPodInfraContainer starts the pod infra container for a pod. Returns the docker container ID of the newly created container. func (kl *Kubelet) createPodInfraContainer(pod *api.BoundPod) (dockertools.DockerID, error) { - var ports []api.Port + var ports []api.ContainerPort // Docker only exports ports from the pod infra container. Let's // collect all of the relevant ports and export them. for _, container := range pod.Spec.Containers { @@ -1411,7 +1411,7 @@ func updateBoundPods(changed []api.BoundPod, current []api.BoundPod) []api.Bound func filterHostPortConflicts(pods []api.BoundPod) []api.BoundPod { filtered := []api.BoundPod{} ports := map[int]bool{} - extract := func(p *api.Port) int { return p.HostPort } + extract := func(p *api.ContainerPort) int { return p.HostPort } for i := range pods { pod := &pods[i] if errs := validation.AccumulateUniquePorts(pod.Spec.Containers, ports, extract); len(errs) != 0 { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index b1e45591e3e..46d8fa67bea 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -1136,7 +1136,7 @@ func TestMakeVolumesAndBinds(t *testing.T) { func TestMakePortsAndBindings(t *testing.T) { container := api.Container{ - Ports: []api.Port{ + Ports: []api.ContainerPort{ { ContainerPort: 80, HostPort: 8080, @@ -1200,12 +1200,12 @@ func TestMakePortsAndBindings(t *testing.T) { func TestCheckHostPortConflicts(t *testing.T) { successCaseAll := []api.BoundPod{ - {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 80}}}}}}, - {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}}}, - {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 82}}}}}}, + {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}}, + {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}}}, + {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 82}}}}}}, } successCaseNew := api.BoundPod{ - Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 83}}}}}, + Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 83}}}}}, } expected := append(successCaseAll, successCaseNew) if actual := filterHostPortConflicts(expected); !reflect.DeepEqual(actual, expected) { @@ -1213,12 +1213,12 @@ func TestCheckHostPortConflicts(t *testing.T) { } failureCaseAll := []api.BoundPod{ - {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 80}}}}}}, - {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}}}, - {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 82}}}}}}, + {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}}, + {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}}}, + {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 82}}}}}}, } failureCaseNew := api.BoundPod{ - Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}}, + Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}}, } if actual := filterHostPortConflicts(append(failureCaseAll, failureCaseNew)); !reflect.DeepEqual(failureCaseAll, actual) { t.Errorf("Expected %#v, Got %#v", expected, actual) diff --git a/pkg/kubelet/probe_test.go b/pkg/kubelet/probe_test.go index 4a22e5e8a64..988ebf55cdf 100644 --- a/pkg/kubelet/probe_test.go +++ b/pkg/kubelet/probe_test.go @@ -32,7 +32,7 @@ import ( func TestFindPortByName(t *testing.T) { container := api.Container{ - Ports: []api.Port{ + Ports: []api.ContainerPort{ { Name: "foo", HostPort: 8080, @@ -71,7 +71,7 @@ func TestGetURLParts(t *testing.T) { for _, test := range testCases { state := api.PodStatus{PodIP: "127.0.0.1"} container := api.Container{ - Ports: []api.Port{{Name: "found", HostPort: 93}}, + Ports: []api.ContainerPort{{Name: "found", HostPort: 93}}, LivenessProbe: &api.Probe{ Handler: api.Handler{ HTTPGet: test.probe, @@ -114,7 +114,7 @@ func TestGetTCPAddrParts(t *testing.T) { for _, test := range testCases { host := "1.2.3.4" container := api.Container{ - Ports: []api.Port{{Name: "found", HostPort: 93}}, + Ports: []api.ContainerPort{{Name: "found", HostPort: 93}}, LivenessProbe: &api.Probe{ Handler: api.Handler{ TCPSocket: test.probe, diff --git a/pkg/master/publish.go b/pkg/master/publish.go index 7cc73559965..ce9e1b53e76 100644 --- a/pkg/master/publish.go +++ b/pkg/master/publish.go @@ -128,25 +128,35 @@ func (m *Master) createMasterServiceIfNeeded(serviceName string, serviceIP net.I func (m *Master) ensureEndpointsContain(serviceName string, ip net.IP, port int) error { ctx := api.NewDefaultContext() e, err := m.endpointRegistry.GetEndpoints(ctx, serviceName) - if err != nil || e.Protocol != api.ProtocolTCP { + if err != nil { e = &api.Endpoints{ ObjectMeta: api.ObjectMeta{ Name: serviceName, Namespace: api.NamespaceDefault, }, - Protocol: api.ProtocolTCP, } } found := false +FindEndpointLoop: for i := range e.Endpoints { ep := &e.Endpoints[i] - if ep.IP == ip.String() && ep.Port == port { - found = true - break + if ep.IP == ip.String() { + for j := range ep.Ports { + epp := &ep.Ports[j] + if epp.Protocol == api.ProtocolTCP && epp.Port == port { + found = true + break FindEndpointLoop + } + } } } if !found { - e.Endpoints = append(e.Endpoints, api.Endpoint{IP: ip.String(), Port: port}) + e.Endpoints = append(e.Endpoints, api.Endpoint{ + IP: ip.String(), + Ports: []api.EndpointPort{ + {Protocol: api.ProtocolTCP, Port: port}, + }, + }) } if len(e.Endpoints) > m.masterCount { // We append to the end and remove from the beginning, so this should diff --git a/pkg/proxy/config/api_test.go b/pkg/proxy/config/api_test.go index 29489f839f3..1b4430fc551 100644 --- a/pkg/proxy/config/api_test.go +++ b/pkg/proxy/config/api_test.go @@ -185,7 +185,7 @@ func TestServicesFromZeroError(t *testing.T) { func TestEndpoints(t *testing.T) { endpoint := api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "2"}, - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: 9000}}}}, } fakeWatch := watch.NewFake() @@ -235,7 +235,7 @@ func TestEndpoints(t *testing.T) { func TestEndpointsFromZero(t *testing.T) { endpoint := api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "2"}, - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: 9000}}}}, } fakeWatch := watch.NewFake() diff --git a/pkg/proxy/loadbalancer.go b/pkg/proxy/loadbalancer.go index a94665383d8..bb9f93bf269 100644 --- a/pkg/proxy/loadbalancer.go +++ b/pkg/proxy/loadbalancer.go @@ -25,8 +25,8 @@ import ( // LoadBalancer is an interface for distributing incoming requests to service endpoints. type LoadBalancer interface { // NextEndpoint returns the endpoint to handle a request for the given - // service and source address. - NextEndpoint(service string, srcAddr net.Addr) (string, error) - NewService(service string, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) error - CleanupStaleStickySessions(service string) + // serviceName:portName and source address. + NextEndpoint(service string, port string, srcAddr net.Addr) (string, error) + NewService(service string, port string, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) error + CleanupStaleStickySessions(service string, port string) } diff --git a/pkg/proxy/proxier.go b/pkg/proxy/proxier.go index 245689cc134..e2b08522362 100644 --- a/pkg/proxy/proxier.go +++ b/pkg/proxy/proxier.go @@ -69,7 +69,8 @@ type tcpProxySocket struct { func tryConnect(service string, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) { for _, retryTimeout := range endpointDialTimeout { - endpoint, err := proxier.loadBalancer.NextEndpoint(service, srcAddr) + // TODO: support multiple service ports + endpoint, err := proxier.loadBalancer.NextEndpoint(service, "", srcAddr) if err != nil { glog.Errorf("Couldn't find an endpoint for %s: %v", service, err) return nil, err @@ -383,7 +384,8 @@ func (proxier *Proxier) ensurePortals() { func (proxier *Proxier) cleanupStaleStickySessions() { for name, info := range proxier.serviceMap { if info.sessionAffinityType != api.AffinityTypeNone { - proxier.loadBalancer.CleanupStaleStickySessions(name) + // TODO: support multiple service ports + proxier.loadBalancer.CleanupStaleStickySessions(name, "") } } } @@ -499,7 +501,8 @@ func (proxier *Proxier) OnUpdate(services []api.Service) { if err != nil { glog.Errorf("Failed to open portal for %q: %v", service.Name, err) } - proxier.loadBalancer.NewService(service.Name, info.sessionAffinityType, info.stickyMaxAgeMinutes) + // TODO: support multiple service ports + proxier.loadBalancer.NewService(service.Name, "", info.sessionAffinityType, info.stickyMaxAgeMinutes) } proxier.mu.Lock() defer proxier.mu.Unlock() diff --git a/pkg/proxy/proxier_test.go b/pkg/proxy/proxier_test.go index 5215bb546eb..71c52b43924 100644 --- a/pkg/proxy/proxier_test.go +++ b/pkg/proxy/proxier_test.go @@ -197,7 +197,7 @@ func TestTCPProxy(t *testing.T) { lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "echo"}, - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: tcpServerPort}}}}, }, }) @@ -217,7 +217,7 @@ func TestUDPProxy(t *testing.T) { lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "echo"}, - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: udpServerPort}}}}, }, }) @@ -246,7 +246,7 @@ func TestTCPProxyStop(t *testing.T) { lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "echo"}, - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: tcpServerPort}}}}, }, }) @@ -277,7 +277,7 @@ func TestUDPProxyStop(t *testing.T) { lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "echo"}, - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: udpServerPort}}}}, }, }) @@ -308,7 +308,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) { lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "echo"}, - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: tcpServerPort}}}}, }, }) @@ -338,7 +338,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) { lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "echo"}, - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: udpServerPort}}}}, }, }) @@ -368,7 +368,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "echo"}, - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: tcpServerPort}}}}, }, }) @@ -407,7 +407,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "echo"}, - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: udpServerPort}}}}, }, }) @@ -446,7 +446,7 @@ func TestTCPProxyUpdatePort(t *testing.T) { lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "echo"}, - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: tcpServerPort}}}}, }, }) @@ -482,7 +482,7 @@ func TestUDPProxyUpdatePort(t *testing.T) { lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "echo"}, - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: udpServerPort}}}}, }, }) diff --git a/pkg/proxy/roundrobin.go b/pkg/proxy/roundrobin.go index 070c82ee70f..384025774ef 100644 --- a/pkg/proxy/roundrobin.go +++ b/pkg/proxy/roundrobin.go @@ -18,6 +18,7 @@ package proxy import ( "errors" + "fmt" "net" "reflect" "strconv" @@ -34,62 +35,83 @@ var ( ErrMissingEndpoints = errors.New("missing endpoints") ) -type sessionAffinityDetail struct { - clientIPAddress string +type affinityState struct { + clientIP string //clientProtocol api.Protocol //not yet used //sessionCookie string //not yet used - endpoint string - lastUsedDTTM time.Time + endpoint string + lastUsed time.Time } -type serviceDetail struct { - name string - sessionAffinityType api.AffinityType - sessionAffinityMap map[string]*sessionAffinityDetail - stickyMaxAgeMinutes int +type affinityPolicy struct { + affinityType api.AffinityType + affinityMap map[string]*affinityState // map client IP -> affinity info + ttlMinutes int +} + +// balancerKey is a string that the balancer uses to key stored state. It is +// formatted as "service_name:port_name", but that should be opaque to most consumers. +type balancerKey string + +func makeBalancerKey(service, port string) balancerKey { + return balancerKey(fmt.Sprintf("%s:%s", service, port)) } // LoadBalancerRR is a round-robin load balancer. type LoadBalancerRR struct { - lock sync.RWMutex - endpointsMap map[string][]string - rrIndex map[string]int - serviceDtlMap map[string]serviceDetail + lock sync.RWMutex + services map[balancerKey]*balancerState } -func newServiceDetail(service string, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) *serviceDetail { - return &serviceDetail{ - name: service, - sessionAffinityType: sessionAffinityType, - sessionAffinityMap: make(map[string]*sessionAffinityDetail), - stickyMaxAgeMinutes: stickyMaxAgeMinutes, +// Ensure this implements LoadBalancer. +var _ LoadBalancer = &LoadBalancerRR{} + +type balancerState struct { + endpoints []string // a list of "ip:port" style strings + index int // index into endpoints + affinity affinityPolicy +} + +func newAffinityPolicy(affinityType api.AffinityType, ttlMinutes int) *affinityPolicy { + return &affinityPolicy{ + affinityType: affinityType, + affinityMap: make(map[string]*affinityState), + ttlMinutes: ttlMinutes, } } // NewLoadBalancerRR returns a new LoadBalancerRR. func NewLoadBalancerRR() *LoadBalancerRR { return &LoadBalancerRR{ - endpointsMap: make(map[string][]string), - rrIndex: make(map[string]int), - serviceDtlMap: make(map[string]serviceDetail), + services: map[balancerKey]*balancerState{}, } } -func (lb *LoadBalancerRR) NewService(service string, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) error { - if stickyMaxAgeMinutes == 0 { - stickyMaxAgeMinutes = 180 //default to 3 hours if not specified. Should 0 be unlimeted instead???? - } - if _, exists := lb.serviceDtlMap[service]; !exists { - lb.serviceDtlMap[service] = *newServiceDetail(service, sessionAffinityType, stickyMaxAgeMinutes) - glog.V(4).Infof("NewService. Service does not exist. So I created it: %+v", lb.serviceDtlMap[service]) - } +func (lb *LoadBalancerRR) NewService(service, port string, affinityType api.AffinityType, ttlMinutes int) error { + lb.lock.Lock() + defer lb.lock.Unlock() + + lb.newServiceInternal(service, port, affinityType, ttlMinutes) return nil } -// return true if this service detail is using some form of session affinity. -func isSessionAffinity(serviceDtl serviceDetail) bool { - //Should never be empty string, but chekcing for it to be safe. - if serviceDtl.sessionAffinityType == "" || serviceDtl.sessionAffinityType == api.AffinityTypeNone { +func (lb *LoadBalancerRR) newServiceInternal(service, port string, affinityType api.AffinityType, ttlMinutes int) *balancerState { + if ttlMinutes == 0 { + ttlMinutes = 180 //default to 3 hours if not specified. Should 0 be unlimeted instead???? + } + + key := makeBalancerKey(service, port) + if _, exists := lb.services[key]; !exists { + lb.services[key] = &balancerState{affinity: *newAffinityPolicy(affinityType, ttlMinutes)} + glog.V(4).Infof("LoadBalancerRR service %q did not exist, created", service) + } + return lb.services[key] +} + +// return true if this service is using some form of session affinity. +func isSessionAffinity(affinity *affinityPolicy) bool { + // Should never be empty string, but checking for it to be safe. + if affinity.affinityType == "" || affinity.affinityType == api.AffinityTypeNone { return false } return true @@ -97,100 +119,111 @@ func isSessionAffinity(serviceDtl serviceDetail) bool { // NextEndpoint returns a service endpoint. // The service endpoint is chosen using the round-robin algorithm. -func (lb *LoadBalancerRR) NextEndpoint(service string, srcAddr net.Addr) (string, error) { - var ipaddr string - glog.V(4).Infof("NextEndpoint. service: %s. srcAddr: %+v. Endpoints: %+v", service, srcAddr, lb.endpointsMap) +func (lb *LoadBalancerRR) NextEndpoint(service, port string, srcAddr net.Addr) (string, error) { + // Coarse locking is simple. We can get more fine-grained if/when we + // can prove it matters. + lb.lock.Lock() + defer lb.lock.Unlock() - lb.lock.RLock() - serviceDtls, exists := lb.serviceDtlMap[service] - endpoints, _ := lb.endpointsMap[service] - index := lb.rrIndex[service] - sessionAffinityEnabled := isSessionAffinity(serviceDtls) - - lb.lock.RUnlock() - if !exists { + key := makeBalancerKey(service, port) + state, exists := lb.services[key] + if !exists || state == nil { return "", ErrMissingServiceEntry } - if len(endpoints) == 0 { + if len(state.endpoints) == 0 { return "", ErrMissingEndpoints } + glog.V(4).Infof("NextEndpoint for service %q, srcAddr=%v: endpoints: %+v", service, srcAddr, state.endpoints) + + sessionAffinityEnabled := isSessionAffinity(&state.affinity) + + var ipaddr string if sessionAffinityEnabled { - if _, _, err := net.SplitHostPort(srcAddr.String()); err == nil { - ipaddr, _, _ = net.SplitHostPort(srcAddr.String()) + // Caution: don't shadow ipaddr + var err error + ipaddr, _, err = net.SplitHostPort(srcAddr.String()) + if err != nil { + return "", fmt.Errorf("malformed source address %q: %v", srcAddr.String(), err) } - sessionAffinity, exists := serviceDtls.sessionAffinityMap[ipaddr] - glog.V(4).Infof("NextEndpoint. Key: %s. sessionAffinity: %+v", ipaddr, sessionAffinity) - if exists && int(time.Now().Sub(sessionAffinity.lastUsedDTTM).Minutes()) < serviceDtls.stickyMaxAgeMinutes { + sessionAffinity, exists := state.affinity.affinityMap[ipaddr] + if exists && int(time.Now().Sub(sessionAffinity.lastUsed).Minutes()) < state.affinity.ttlMinutes { + // Affinity wins. endpoint := sessionAffinity.endpoint - sessionAffinity.lastUsedDTTM = time.Now() - glog.V(4).Infof("NextEndpoint. Key: %s. sessionAffinity: %+v", ipaddr, sessionAffinity) + sessionAffinity.lastUsed = time.Now() + glog.V(4).Infof("NextEndpoint for service %q from IP %s with sessionAffinity %+v: %s", service, ipaddr, sessionAffinity, endpoint) return endpoint, nil } } - endpoint := endpoints[index] - lb.lock.Lock() - lb.rrIndex[service] = (index + 1) % len(endpoints) + // Take the next endpoint. + endpoint := state.endpoints[state.index] + state.index = (state.index + 1) % len(state.endpoints) if sessionAffinityEnabled { - var affinity *sessionAffinityDetail - affinity, _ = lb.serviceDtlMap[service].sessionAffinityMap[ipaddr] + var affinity *affinityState + affinity = state.affinity.affinityMap[ipaddr] if affinity == nil { - affinity = new(sessionAffinityDetail) //&sessionAffinityDetail{ipaddr, "TCP", "", endpoint, time.Now()} - lb.serviceDtlMap[service].sessionAffinityMap[ipaddr] = affinity + affinity = new(affinityState) //&affinityState{ipaddr, "TCP", "", endpoint, time.Now()} + state.affinity.affinityMap[ipaddr] = affinity } - affinity.lastUsedDTTM = time.Now() + affinity.lastUsed = time.Now() affinity.endpoint = endpoint - affinity.clientIPAddress = ipaddr - - glog.V(4).Infof("NextEndpoint. New Affinity key %s: %+v", ipaddr, lb.serviceDtlMap[service].sessionAffinityMap[ipaddr]) + affinity.clientIP = ipaddr + glog.V(4).Infof("Updated affinity key %s: %+v", ipaddr, state.affinity.affinityMap[ipaddr]) } - lb.lock.Unlock() return endpoint, nil } -func isValidEndpoint(ep *api.Endpoint) bool { - return ep.IP != "" && ep.Port > 0 +type hostPortPair struct { + host string + port int } -func filterValidEndpoints(endpoints []api.Endpoint) []string { - // Convert Endpoint objects into strings for easier use later. Ignore - // the protocol field - we'll get that from the Service objects. +func isValidEndpoint(hpp *hostPortPair) bool { + return hpp.host != "" && hpp.port > 0 +} + +func getValidEndpoints(pairs []hostPortPair) []string { + // Convert structs into strings for easier use later. var result []string - for i := range endpoints { - ep := &endpoints[i] - if isValidEndpoint(ep) { - result = append(result, net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port))) + for i := range pairs { + hpp := &pairs[i] + if isValidEndpoint(hpp) { + result = append(result, net.JoinHostPort(hpp.host, strconv.Itoa(hpp.port))) } } return result } -//remove any session affinity records associated to a particular endpoint (for example when a pod goes down). -func removeSessionAffinityByEndpoint(lb *LoadBalancerRR, service string, endpoint string) { - for _, affinityDetail := range lb.serviceDtlMap[service].sessionAffinityMap { - if affinityDetail.endpoint == endpoint { - glog.V(4).Infof("Removing client: %s from sessionAffinityMap for service: %s", affinityDetail.endpoint, service) - delete(lb.serviceDtlMap[service].sessionAffinityMap, affinityDetail.clientIPAddress) +// Remove any session affinity records associated to a particular endpoint (for example when a pod goes down). +func removeSessionAffinityByEndpoint(state *balancerState, service balancerKey, endpoint string) { + for _, affinity := range state.affinity.affinityMap { + if affinity.endpoint == endpoint { + glog.V(4).Infof("Removing client: %s from affinityMap for service %q", affinity.endpoint, service) + delete(state.affinity.affinityMap, affinity.clientIP) } } } -//Loop through the valid endpoints and then the endpoints associated with the Load Balancer. -// Then remove any session affinity records that are not in both lists. -func updateServiceDetailMap(lb *LoadBalancerRR, service string, validEndpoints []string) { +// Loop through the valid endpoints and then the endpoints associated with the Load Balancer. +// Then remove any session affinity records that are not in both lists. +// This assumes the lb.lock is held. +func (lb *LoadBalancerRR) updateAffinityMap(service balancerKey, newEndpoints []string) { allEndpoints := map[string]int{} - for _, validEndpoint := range validEndpoints { - allEndpoints[validEndpoint] = 1 + for _, newEndpoint := range newEndpoints { + allEndpoints[newEndpoint] = 1 } - for _, existingEndpoint := range lb.endpointsMap[service] { + state, exists := lb.services[service] + if !exists { + return + } + for _, existingEndpoint := range state.endpoints { allEndpoints[existingEndpoint] = allEndpoints[existingEndpoint] + 1 } for mKey, mVal := range allEndpoints { if mVal == 1 { - glog.V(3).Infof("Delete endpoint %s for service: %s", mKey, service) - removeSessionAffinityByEndpoint(lb, service, mKey) - delete(lb.serviceDtlMap[service].sessionAffinityMap, mKey) + glog.V(3).Infof("Delete endpoint %s for service %q", mKey, service) + removeSessionAffinityByEndpoint(state, service, mKey) } } } @@ -198,44 +231,86 @@ func updateServiceDetailMap(lb *LoadBalancerRR, service string, validEndpoints [ // OnUpdate manages the registered service endpoints. // Registered endpoints are updated if found in the update set or // unregistered if missing from the update set. -func (lb *LoadBalancerRR) OnUpdate(endpoints []api.Endpoints) { - registeredEndpoints := make(map[string]bool) +func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) { + registeredEndpoints := make(map[balancerKey]bool) lb.lock.Lock() defer lb.lock.Unlock() - // Update endpoints for services. - for _, endpoint := range endpoints { - existingEndpoints, exists := lb.endpointsMap[endpoint.Name] - validEndpoints := filterValidEndpoints(endpoint.Endpoints) - if !exists || !reflect.DeepEqual(slice.SortStrings(slice.CopyStrings(existingEndpoints)), slice.SortStrings(validEndpoints)) { - glog.V(3).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", endpoint.Name, endpoint.Endpoints) - updateServiceDetailMap(lb, endpoint.Name, validEndpoints) - // On update can be called without NewService being called externally. - // to be safe we will call it here. A new service will only be created - // if one does not already exist. - lb.NewService(endpoint.Name, api.AffinityTypeNone, 0) - lb.endpointsMap[endpoint.Name] = slice.ShuffleStrings(validEndpoints) - // Reset the round-robin index. - lb.rrIndex[endpoint.Name] = 0 + // Update endpoints for services. + for i := range allEndpoints { + svcEndpoints := &allEndpoints[i] + + // We need to build a map of portname -> all ip:ports for that portname. + portsToEndpoints := map[string][]hostPortPair{} + + // Explode the Endpoints.Endpoints[*].Ports[*] into the aforementioned map. + // FIXME: this is awkward. Maybe a different factoring of Endpoints is better? + for j := range svcEndpoints.Endpoints { + ep := &svcEndpoints.Endpoints[j] + for k := range ep.Ports { + epp := &ep.Ports[k] + portsToEndpoints[epp.Name] = append(portsToEndpoints[epp.Name], hostPortPair{ep.IP, epp.Port}) + // Ignore the protocol field - we'll get that from the Service objects. + } + } + + for portname := range portsToEndpoints { + key := makeBalancerKey(svcEndpoints.Name, portname) + state, exists := lb.services[key] + curEndpoints := []string{} + if state != nil { + curEndpoints = state.endpoints + } + newEndpoints := getValidEndpoints(portsToEndpoints[portname]) + if !exists || state == nil || len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) { + glog.V(3).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcEndpoints.Name, svcEndpoints.Endpoints) + lb.updateAffinityMap(key, newEndpoints) + // On update can be called without NewService being called externally. + // To be safe we will call it here. A new service will only be created + // if one does not already exist. + state = lb.newServiceInternal(svcEndpoints.Name, portname, api.AffinityTypeNone, 0) + state.endpoints = slice.ShuffleStrings(newEndpoints) + + // Reset the round-robin index. + state.index = 0 + } + registeredEndpoints[key] = true } - registeredEndpoints[endpoint.Name] = true } // Remove endpoints missing from the update. - for k, v := range lb.endpointsMap { + for k := range lb.services { if _, exists := registeredEndpoints[k]; !exists { - glog.V(3).Infof("LoadBalancerRR: Removing endpoints for %s -> %+v", k, v) - delete(lb.endpointsMap, k) - delete(lb.serviceDtlMap, k) + glog.V(3).Infof("LoadBalancerRR: Removing endpoints for %s", k) + delete(lb.services, k) } } } -func (lb *LoadBalancerRR) CleanupStaleStickySessions(service string) { - stickyMaxAgeMinutes := lb.serviceDtlMap[service].stickyMaxAgeMinutes - for key, affinityDetail := range lb.serviceDtlMap[service].sessionAffinityMap { - if int(time.Now().Sub(affinityDetail.lastUsedDTTM).Minutes()) >= stickyMaxAgeMinutes { - glog.V(4).Infof("Removing client: %s from sessionAffinityMap for service: %s. Last used is greater than %d minutes....", affinityDetail.clientIPAddress, service, stickyMaxAgeMinutes) - delete(lb.serviceDtlMap[service].sessionAffinityMap, key) +// Tests whether two slices are equivalent. This sorts both slices in-place. +func slicesEquiv(lhs, rhs []string) bool { + if len(lhs) != len(rhs) { + return false + } + if reflect.DeepEqual(slice.SortStrings(lhs), slice.SortStrings(rhs)) { + return true + } + return false +} + +func (lb *LoadBalancerRR) CleanupStaleStickySessions(service, port string) { + lb.lock.Lock() + defer lb.lock.Unlock() + + key := makeBalancerKey(service, port) + state, exists := lb.services[key] + if !exists { + glog.Warning("CleanupStaleStickySessions called for non-existent balancer key %q", service) + return + } + for ip, affinity := range state.affinity.affinityMap { + if int(time.Now().Sub(affinity.lastUsed).Minutes()) >= state.affinity.ttlMinutes { + glog.V(4).Infof("Removing client %s from affinityMap for service %q", affinity.clientIP, service) + delete(state.affinity.affinityMap, ip) } } } diff --git a/pkg/proxy/roundrobin_test.go b/pkg/proxy/roundrobin_test.go index 3298aa92bd5..0861c5ec72e 100644 --- a/pkg/proxy/roundrobin_test.go +++ b/pkg/proxy/roundrobin_test.go @@ -24,29 +24,29 @@ import ( ) func TestValidateWorks(t *testing.T) { - if isValidEndpoint(&api.Endpoint{}) { + if isValidEndpoint(&hostPortPair{}) { t.Errorf("Didn't fail for empty string") } - if isValidEndpoint(&api.Endpoint{IP: "foobar"}) { + if isValidEndpoint(&hostPortPair{host: "foobar"}) { t.Errorf("Didn't fail with no port") } - if isValidEndpoint(&api.Endpoint{IP: "foobar", Port: -1}) { + if isValidEndpoint(&hostPortPair{host: "foobar", port: -1}) { t.Errorf("Didn't fail with a negative port") } - if !isValidEndpoint(&api.Endpoint{IP: "foobar", Port: 8080}) { + if !isValidEndpoint(&hostPortPair{host: "foobar", port: 8080}) { t.Errorf("Failed a valid config.") } } func TestFilterWorks(t *testing.T) { - endpoints := []api.Endpoint{ - {IP: "foobar", Port: 1}, - {IP: "foobar", Port: 2}, - {IP: "foobar", Port: -1}, - {IP: "foobar", Port: 3}, - {IP: "foobar", Port: -2}, + endpoints := []hostPortPair{ + {host: "foobar", port: 1}, + {host: "foobar", port: 2}, + {host: "foobar", port: -1}, + {host: "foobar", port: 3}, + {host: "foobar", port: -2}, } - filtered := filterValidEndpoints(endpoints) + filtered := getValidEndpoints(endpoints) if len(filtered) != 3 { t.Errorf("Failed to filter to the correct size") @@ -66,7 +66,7 @@ func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) { loadBalancer := NewLoadBalancerRR() var endpoints []api.Endpoints loadBalancer.OnUpdate(endpoints) - endpoint, err := loadBalancer.NextEndpoint("foo", nil) + endpoint, err := loadBalancer.NextEndpoint("foo", "http", nil) if err == nil { t.Errorf("Didn't fail with non-existent service") } @@ -75,8 +75,8 @@ func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) { } } -func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service string, expected string, netaddr net.Addr) { - endpoint, err := loadBalancer.NextEndpoint(service, netaddr) +func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service string, port string, expected string, netaddr net.Addr) { + endpoint, err := loadBalancer.NextEndpoint(service, port, netaddr) if err != nil { t.Errorf("Didn't find a service for %s, expected %s, failed with: %v", service, expected, err) } @@ -87,25 +87,41 @@ func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service string, func TestLoadBalanceWorksWithSingleEndpoint(t *testing.T) { loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", nil) + endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } endpoints := make([]api.Endpoints, 1) endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, - Endpoints: []api.Endpoint{{IP: "endpoint1", Port: 40}}, + Endpoints: []api.Endpoint{{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 40}}}}, } loadBalancer.OnUpdate(endpoints) - expectEndpoint(t, loadBalancer, "foo", "endpoint1:40", nil) - expectEndpoint(t, loadBalancer, "foo", "endpoint1:40", nil) - expectEndpoint(t, loadBalancer, "foo", "endpoint1:40", nil) - expectEndpoint(t, loadBalancer, "foo", "endpoint1:40", nil) + expectEndpoint(t, loadBalancer, "foo", "p", "endpoint:40", nil) + expectEndpoint(t, loadBalancer, "foo", "p", "endpoint:40", nil) + expectEndpoint(t, loadBalancer, "foo", "p", "endpoint:40", nil) + expectEndpoint(t, loadBalancer, "foo", "p", "endpoint:40", nil) +} + +func stringsInSlice(haystack []string, needles ...string) bool { + for _, needle := range needles { + found := false + for i := range haystack { + if haystack[i] == needle { + found = true + break + } + } + if found == false { + return false + } + } + return true } func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) { loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", nil) + endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } @@ -113,22 +129,77 @@ func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) { endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Port: 1}, - {IP: "endpoint", Port: 2}, - {IP: "endpoint", Port: 3}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 2}}}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 3}}}, }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.endpointsMap["foo"] - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) + + shuffledEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints + if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") { + t.Errorf("did not find expected endpoints: %v", shuffledEndpoints) + } + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], nil) +} + +func TestLoadBalanceWorksWithMultipleEndpointsAndPorts(t *testing.T) { + loadBalancer := NewLoadBalancerRR() + endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil) + if err == nil || len(endpoint) != 0 { + t.Errorf("Didn't fail with non-existent service") + } + endpoints := make([]api.Endpoints, 1) + endpoints[0] = api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: "foo"}, + Endpoints: []api.Endpoint{ + {IP: "endpoint", Ports: []api.EndpointPort{ + {Name: "p", Port: 1}, + {Name: "q", Port: 10}, + }}, + {IP: "endpoint", Ports: []api.EndpointPort{ + {Name: "p", Port: 2}, + {Name: "q", Port: 20}, + }}, + {IP: "endpoint", Ports: []api.EndpointPort{ + {Name: "p", Port: 3}, + {Name: "q", Port: 30}, + }}, + }, + } + loadBalancer.OnUpdate(endpoints) + + shuffledEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints + if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") { + t.Errorf("did not find expected endpoints: %v", shuffledEndpoints) + } + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], nil) + + shuffledEndpoints = loadBalancer.services[makeBalancerKey("foo", "q")].endpoints + if !stringsInSlice(shuffledEndpoints, "endpoint:10", "endpoint:20", "endpoint:30") { + t.Errorf("did not find expected endpoints: %v", shuffledEndpoints) + } + expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[2], nil) + expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[2], nil) } func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", nil) + endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } @@ -136,37 +207,86 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Port: 1}, - {IP: "endpoint", Port: 2}, - {IP: "endpoint", Port: 3}, + {IP: "endpoint", Ports: []api.EndpointPort{ + {Name: "p", Port: 1}, + {Name: "q", Port: 10}, + }}, + {IP: "endpoint", Ports: []api.EndpointPort{ + {Name: "p", Port: 2}, + {Name: "q", Port: 20}, + }}, + {IP: "endpoint", Ports: []api.EndpointPort{ + {Name: "p", Port: 3}, + {Name: "q", Port: 30}, + }}, }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.endpointsMap["foo"] - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) + + shuffledEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints + if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") { + t.Errorf("did not find expected endpoints: %v", shuffledEndpoints) + } + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], nil) + + shuffledEndpoints = loadBalancer.services[makeBalancerKey("foo", "q")].endpoints + if !stringsInSlice(shuffledEndpoints, "endpoint:10", "endpoint:20", "endpoint:30") { + t.Errorf("did not find expected endpoints: %v", shuffledEndpoints) + } + expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[2], nil) + expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[1], nil) + // Then update the configuration with one fewer endpoints, make sure // we start in the beginning again endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Port: 8}, - {IP: "endpoint", Port: 9}, + {IP: "endpoint", Ports: []api.EndpointPort{ + {Name: "p", Port: 8}, + {Name: "q", Port: 80}, + }}, + {IP: "endpoint", Ports: []api.EndpointPort{ + {Name: "p", Port: 9}, + {Name: "q", Port: 90}, + }}, }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints = loadBalancer.endpointsMap["foo"] - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) + + shuffledEndpoints = loadBalancer.services[makeBalancerKey("foo", "p")].endpoints + if !stringsInSlice(shuffledEndpoints, "endpoint:8", "endpoint:9") { + t.Errorf("did not find expected endpoints: %v", shuffledEndpoints) + } + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], nil) + + shuffledEndpoints = loadBalancer.services[makeBalancerKey("foo", "q")].endpoints + if !stringsInSlice(shuffledEndpoints, "endpoint:80", "endpoint:90") { + t.Errorf("did not find expected endpoints: %v", shuffledEndpoints) + } + expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[1], nil) + // Clear endpoints endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{}} loadBalancer.OnUpdate(endpoints) - endpoint, err = loadBalancer.NextEndpoint("foo", nil) + endpoint, err = loadBalancer.NextEndpoint("foo", "p", nil) + if err == nil || len(endpoint) != 0 { + t.Errorf("Didn't fail with non-existent service") + } + + endpoint, err = loadBalancer.NextEndpoint("foo", "q", nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } @@ -174,7 +294,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", nil) + endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } @@ -182,133 +302,183 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Port: 1}, - {IP: "endpoint", Port: 2}, - {IP: "endpoint", Port: 3}, + {IP: "endpoint", Ports: []api.EndpointPort{ + {Name: "p", Port: 1}, + {Name: "q", Port: 10}, + }}, + {IP: "endpoint", Ports: []api.EndpointPort{ + {Name: "p", Port: 2}, + {Name: "q", Port: 20}, + }}, + {IP: "endpoint", Ports: []api.EndpointPort{ + {Name: "p", Port: 3}, + {Name: "q", Port: 30}, + }}, }, } endpoints[1] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "bar"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Port: 4}, - {IP: "endpoint", Port: 5}, + {IP: "endpoint", Ports: []api.EndpointPort{ + {Name: "p", Port: 4}, + {Name: "q", Port: 40}, + }}, + {IP: "endpoint", Ports: []api.EndpointPort{ + {Name: "p", Port: 5}, + {Name: "q", Port: 50}, + }}, }, } loadBalancer.OnUpdate(endpoints) - shuffledFooEndpoints := loadBalancer.endpointsMap["foo"] - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[2], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], nil) - shuffledBarEndpoints := loadBalancer.endpointsMap["bar"] - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil) + shuffledFooEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints + if !stringsInSlice(shuffledFooEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") { + t.Errorf("did not find expected endpoints: %v", shuffledFooEndpoints) + } + expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[2], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[1], nil) + + shuffledFooEndpoints = loadBalancer.services[makeBalancerKey("foo", "q")].endpoints + if !stringsInSlice(shuffledFooEndpoints, "endpoint:10", "endpoint:20", "endpoint:30") { + t.Errorf("did not find expected endpoints: %v", shuffledFooEndpoints) + } + expectEndpoint(t, loadBalancer, "foo", "q", shuffledFooEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", "q", shuffledFooEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "foo", "q", shuffledFooEndpoints[2], nil) + expectEndpoint(t, loadBalancer, "foo", "q", shuffledFooEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", "q", shuffledFooEndpoints[1], nil) + + shuffledBarEndpoints := loadBalancer.services[makeBalancerKey("bar", "p")].endpoints + if !stringsInSlice(shuffledBarEndpoints, "endpoint:4", "endpoint:5") { + t.Errorf("did not find expected endpoints: %v", shuffledBarEndpoints) + } + expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[0], nil) + + shuffledBarEndpoints = loadBalancer.services[makeBalancerKey("bar", "q")].endpoints + if !stringsInSlice(shuffledBarEndpoints, "endpoint:40", "endpoint:50") { + t.Errorf("did not find expected endpoints: %v", shuffledBarEndpoints) + } + expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[0], nil) // Then update the configuration by removing foo loadBalancer.OnUpdate(endpoints[1:]) - endpoint, err = loadBalancer.NextEndpoint("foo", nil) + endpoint, err = loadBalancer.NextEndpoint("foo", "p", nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } // but bar is still there, and we continue RR from where we left off. - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[0], nil) } func TestStickyLoadBalanceWorksWithSingleEndpoint(t *testing.T) { client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", nil) + endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0) - endpoints := make([]api.Endpoints, 1) - endpoints[0] = api.Endpoints{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, - Endpoints: []api.Endpoint{{IP: "endpoint", Port: 1}}, - } - loadBalancer.OnUpdate(endpoints) - expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client1) - expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client1) - expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client2) - expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client2) -} - -func TestStickyLoadBalanaceWorksWithMultipleEndpoints(t *testing.T) { - client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} - client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} - client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} - loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", nil) - if err == nil || len(endpoint) != 0 { - t.Errorf("Didn't fail with non-existent service") - } - - loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0) + loadBalancer.NewService("foo", "p", api.AffinityTypeClientIP, 0) endpoints := make([]api.Endpoints, 1) endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Port: 1}, - {IP: "endpoint", Port: 2}, - {IP: "endpoint", Port: 3}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}}, }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.endpointsMap["foo"] - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client3) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client3) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", "p", "endpoint:1", client1) + expectEndpoint(t, loadBalancer, "foo", "p", "endpoint:1", client1) + expectEndpoint(t, loadBalancer, "foo", "p", "endpoint:1", client2) + expectEndpoint(t, loadBalancer, "foo", "p", "endpoint:1", client2) } -func TestStickyLoadBalanaceWorksWithMultipleEndpointsStickyNone(t *testing.T) { +func TestStickyLoadBalanceWorksWithMultipleEndpoints(t *testing.T) { client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", nil) + endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - loadBalancer.NewService("foo", api.AffinityTypeNone, 0) + loadBalancer.NewService("foo", "p", api.AffinityTypeClientIP, 0) endpoints := make([]api.Endpoints, 1) endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Port: 1}, - {IP: "endpoint", Port: 2}, - {IP: "endpoint", Port: 3}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 2}}}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 3}}}, }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.endpointsMap["foo"] - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client2) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client2) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client3) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client3) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client1) + shuffledEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints + if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") { + t.Errorf("did not find expected endpoints: %v", shuffledEndpoints) + } + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], client3) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], client3) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1) } -func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { +func TestStickyLoadBalanceWorksWithMultipleEndpointsStickyNone(t *testing.T) { + client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} + client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} + client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} + loadBalancer := NewLoadBalancerRR() + endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil) + if err == nil || len(endpoint) != 0 { + t.Errorf("Didn't fail with non-existent service") + } + + loadBalancer.NewService("foo", "p", api.AffinityTypeNone, 0) + endpoints := make([]api.Endpoints, 1) + endpoints[0] = api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: "foo"}, + Endpoints: []api.Endpoint{ + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 2}}}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 3}}}, + }, + } + loadBalancer.OnUpdate(endpoints) + shuffledEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints + if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") { + t.Errorf("did not find expected endpoints: %v", shuffledEndpoints) + } + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client1) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], client2) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client2) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client3) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], client3) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client1) +} + +func TestStickyLoadBalanceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} @@ -316,41 +486,44 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { client5 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 5), Port: 0} client6 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 6), Port: 0} loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", nil) + endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0) + loadBalancer.NewService("foo", "p", api.AffinityTypeClientIP, 0) endpoints := make([]api.Endpoints, 1) endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Port: 1}, - {IP: "endpoint", Port: 2}, - {IP: "endpoint", Port: 3}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 2}}}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 3}}}, }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.endpointsMap["foo"] - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) + shuffledEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints + if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") { + t.Errorf("did not find expected endpoints: %v", shuffledEndpoints) + } + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1) client1Endpoint := shuffledEndpoints[0] - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2) client2Endpoint := shuffledEndpoints[1] - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client3) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], client3) client3Endpoint := shuffledEndpoints[2] endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Port: 1}, - {IP: "endpoint", Port: 2}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 2}}}, }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints = loadBalancer.endpointsMap["foo"] + shuffledEndpoints = loadBalancer.services[makeBalancerKey("foo", "p")].endpoints if client1Endpoint == "endpoint:3" { client1Endpoint = shuffledEndpoints[0] } else if client2Endpoint == "endpoint:3" { @@ -358,26 +531,26 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { } else if client3Endpoint == "endpoint:3" { client3Endpoint = shuffledEndpoints[0] } - expectEndpoint(t, loadBalancer, "foo", client1Endpoint, client1) - expectEndpoint(t, loadBalancer, "foo", client2Endpoint, client2) - expectEndpoint(t, loadBalancer, "foo", client3Endpoint, client3) + expectEndpoint(t, loadBalancer, "foo", "p", client1Endpoint, client1) + expectEndpoint(t, loadBalancer, "foo", "p", client2Endpoint, client2) + expectEndpoint(t, loadBalancer, "foo", "p", client3Endpoint, client3) endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Port: 1}, - {IP: "endpoint", Port: 2}, - {IP: "endpoint", Port: 4}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 2}}}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 4}}}, }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints = loadBalancer.endpointsMap["foo"] - expectEndpoint(t, loadBalancer, "foo", client1Endpoint, client1) - expectEndpoint(t, loadBalancer, "foo", client2Endpoint, client2) - expectEndpoint(t, loadBalancer, "foo", client3Endpoint, client3) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client4) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client5) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client6) + shuffledEndpoints = loadBalancer.services[makeBalancerKey("foo", "p")].endpoints + expectEndpoint(t, loadBalancer, "foo", "p", client1Endpoint, client1) + expectEndpoint(t, loadBalancer, "foo", "p", client2Endpoint, client2) + expectEndpoint(t, loadBalancer, "foo", "p", client3Endpoint, client3) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client4) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client5) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], client6) } func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { @@ -385,51 +558,54 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", nil) + endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0) + loadBalancer.NewService("foo", "p", api.AffinityTypeClientIP, 0) endpoints := make([]api.Endpoints, 1) endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Port: 1}, - {IP: "endpoint", Port: 2}, - {IP: "endpoint", Port: 3}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 2}}}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 3}}}, }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.endpointsMap["foo"] - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client3) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) + shuffledEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints + if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") { + t.Errorf("did not find expected endpoints: %v", shuffledEndpoints) + } + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], client3) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2) // Then update the configuration with one fewer endpoints, make sure // we start in the beginning again endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Port: 4}, - {IP: "endpoint", Port: 5}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 4}}}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 5}}}, }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints = loadBalancer.endpointsMap["foo"] - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) + shuffledEndpoints = loadBalancer.services[makeBalancerKey("foo", "p")].endpoints + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2) // Clear endpoints endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{}} loadBalancer.OnUpdate(endpoints) - endpoint, err = loadBalancer.NextEndpoint("foo", nil) + endpoint, err = loadBalancer.NextEndpoint("foo", "p", nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } @@ -440,58 +616,58 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", nil) + endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0) + loadBalancer.NewService("foo", "p", api.AffinityTypeClientIP, 0) endpoints := make([]api.Endpoints, 2) endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Port: 1}, - {IP: "endpoint", Port: 2}, - {IP: "endpoint", Port: 3}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 2}}}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 3}}}, }, } - loadBalancer.NewService("bar", api.AffinityTypeClientIP, 0) + loadBalancer.NewService("bar", "p", api.AffinityTypeClientIP, 0) endpoints[1] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "bar"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Port: 5}, - {IP: "endpoint", Port: 5}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 5}}}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 6}}}, }, } loadBalancer.OnUpdate(endpoints) - shuffledFooEndpoints := loadBalancer.endpointsMap["foo"] - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[2], client3) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[2], client3) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2) + shuffledFooEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints + expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[1], client2) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[2], client3) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[2], client3) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[1], client2) - shuffledBarEndpoints := loadBalancer.endpointsMap["bar"] - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) + shuffledBarEndpoints := loadBalancer.services[makeBalancerKey("bar", "p")].endpoints + expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[1], client2) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[1], client2) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[0], client1) // Then update the configuration by removing foo loadBalancer.OnUpdate(endpoints[1:]) - endpoint, err = loadBalancer.NextEndpoint("foo", nil) + endpoint, err = loadBalancer.NextEndpoint("foo", "p", nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } // but bar is still there, and we continue RR from where we left off. - shuffledBarEndpoints = loadBalancer.endpointsMap["bar"] - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1) + shuffledBarEndpoints = loadBalancer.services[makeBalancerKey("bar", "p")].endpoints + expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[1], client2) + expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[1], client2) + expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[0], client1) } diff --git a/pkg/registry/controller/rest_test.go b/pkg/registry/controller/rest_test.go index 9616c654210..4db4c432da9 100644 --- a/pkg/registry/controller/rest_test.go +++ b/pkg/registry/controller/rest_test.go @@ -171,7 +171,7 @@ func TestControllerParsing(t *testing.T) { Containers: []api.Container{ { Image: "dockerfile/nginx", - Ports: []api.Port{ + Ports: []api.ContainerPort{ { ContainerPort: 80, HostPort: 8080, diff --git a/pkg/registry/endpoint/rest_test.go b/pkg/registry/endpoint/rest_test.go index 99e00798844..7deea343ae2 100644 --- a/pkg/registry/endpoint/rest_test.go +++ b/pkg/registry/endpoint/rest_test.go @@ -27,10 +27,20 @@ import ( ) func TestGetEndpoints(t *testing.T) { + expected := []api.Endpoint{ + {IP: "127.0.0.1", Ports: []api.EndpointPort{ + {Name: "p", Port: 9000, Protocol: api.ProtocolTCP}, + {Name: "q", Port: 9000, Protocol: api.ProtocolUDP}, + }}, + {IP: "127.0.0.2", Ports: []api.EndpointPort{ + {Name: "p", Port: 8000, Protocol: api.ProtocolTCP}, + {Name: "q", Port: 8000, Protocol: api.ProtocolUDP}, + }}, + } registry := ®istrytest.ServiceRegistry{ Endpoints: api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}, + Endpoints: expected, }, } storage := NewREST(registry) @@ -39,7 +49,7 @@ func TestGetEndpoints(t *testing.T) { if err != nil { t.Fatalf("unexpected error: %#v", err) } - if !reflect.DeepEqual([]api.Endpoint{{IP: "127.0.0.1", Port: 9000}}, obj.(*api.Endpoints).Endpoints) { + if !reflect.DeepEqual(expected, obj.(*api.Endpoints).Endpoints) { t.Errorf("unexpected endpoints: %#v", obj) } } diff --git a/pkg/registry/etcd/etcd_test.go b/pkg/registry/etcd/etcd_test.go index 2384fb76a3f..cc6d77cbc0e 100644 --- a/pkg/registry/etcd/etcd_test.go +++ b/pkg/registry/etcd/etcd_test.go @@ -24,6 +24,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta2" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" @@ -598,10 +600,10 @@ func TestEtcdListEndpoints(t *testing.T) { Node: &etcd.Node{ Nodes: []*etcd.Node{ { - Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Protocol: "TCP", Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 8345}}}), + Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Name: "p", Port: 8345, Protocol: api.ProtocolTCP}}}}}), }, { - Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "bar"}, Protocol: "TCP"}), + Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "bar"}}), }, }, }, @@ -625,8 +627,7 @@ func TestEtcdGetEndpoints(t *testing.T) { registry := NewTestEtcdRegistry(fakeClient) endpoints := &api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, - Protocol: "TCP", - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 34855}}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: 34855, Protocol: api.ProtocolTCP}}}}, } key, _ := makeServiceEndpointsKey(ctx, "foo") @@ -647,10 +648,19 @@ func TestEtcdUpdateEndpoints(t *testing.T) { fakeClient := tools.NewFakeEtcdClient(t) fakeClient.TestIndex = true registry := NewTestEtcdRegistry(fakeClient) + + // TODO: Once we drop single-port APIs, make this test use the + // multi-port features. This will force a compile error when those APIs + // are deleted. + _ = v1beta1.Dependency + _ = v1beta2.Dependency + endpoints := api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, - Protocol: "TCP", - Endpoints: []api.Endpoint{{IP: "baz"}, {IP: "bar"}}, + Endpoints: []api.Endpoint{ + {IP: "baz", Ports: []api.EndpointPort{{Port: 1, Protocol: api.ProtocolTCP}}}, + {IP: "bar", Ports: []api.EndpointPort{{Port: 2, Protocol: api.ProtocolTCP}}}, + }, } key, _ := makeServiceEndpointsKey(ctx, "foo") diff --git a/pkg/registry/pod/etcd/etcd_test.go b/pkg/registry/pod/etcd/etcd_test.go index cb9d1d4b4a1..449d99d9bf0 100644 --- a/pkg/registry/pod/etcd/etcd_test.go +++ b/pkg/registry/pod/etcd/etcd_test.go @@ -592,7 +592,7 @@ func TestResourceLocation(t *testing.T) { ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.PodSpec{ Containers: []api.Container{ - {Name: "ctr", Ports: []api.Port{{ContainerPort: 9376}}}, + {Name: "ctr", Ports: []api.ContainerPort{{ContainerPort: 9376}}}, }, }, }, @@ -604,7 +604,7 @@ func TestResourceLocation(t *testing.T) { ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.PodSpec{ Containers: []api.Container{ - {Name: "ctr", Ports: []api.Port{{ContainerPort: 9376}}}, + {Name: "ctr", Ports: []api.ContainerPort{{ContainerPort: 9376}}}, }, }, }, @@ -617,7 +617,7 @@ func TestResourceLocation(t *testing.T) { Spec: api.PodSpec{ Containers: []api.Container{ {Name: "ctr1"}, - {Name: "ctr2", Ports: []api.Port{{ContainerPort: 9376}}}, + {Name: "ctr2", Ports: []api.ContainerPort{{ContainerPort: 9376}}}, }, }, }, @@ -629,8 +629,8 @@ func TestResourceLocation(t *testing.T) { ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.PodSpec{ Containers: []api.Container{ - {Name: "ctr1", Ports: []api.Port{{ContainerPort: 9376}}}, - {Name: "ctr2", Ports: []api.Port{{ContainerPort: 1234}}}, + {Name: "ctr1", Ports: []api.ContainerPort{{ContainerPort: 9376}}}, + {Name: "ctr2", Ports: []api.ContainerPort{{ContainerPort: 1234}}}, }, }, }, diff --git a/pkg/registry/service/rest.go b/pkg/registry/service/rest.go index c3690f61431..b000063e7a2 100644 --- a/pkg/registry/service/rest.go +++ b/pkg/registry/service/rest.go @@ -21,6 +21,7 @@ import ( "math/rand" "net" "strconv" + "strings" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" @@ -218,17 +219,57 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, boo // ResourceLocation returns a URL to which one can send traffic for the specified service. func (rs *REST) ResourceLocation(ctx api.Context, id string) (string, error) { - eps, err := rs.registry.GetEndpoints(ctx, id) + // Allow ID as "svcname" or "svcname:port". Choose an endpoint at + // random. If the port is specified as a number, use that value + // directly. If the port is specified as a name, try to look up that + // name on the chosen endpoint. If port is not specified, try to use + // the first unnamed port on the chosen endpoint. If there are no + // unnamed ports, try to use the first defined port. + parts := strings.Split(id, ":") + if len(parts) > 2 { + return "", errors.NewBadRequest(fmt.Sprintf("invalid service request %q", id)) + } + name := parts[0] + port := "" + if len(parts) == 2 { + port = parts[1] + } + + eps, err := rs.registry.GetEndpoints(ctx, name) if err != nil { return "", err } if len(eps.Endpoints) == 0 { - return "", fmt.Errorf("no endpoints available for %v", id) + return "", fmt.Errorf("no endpoints available for %v", name) } + ep := &eps.Endpoints[rand.Intn(len(eps.Endpoints))] + + // Try to figure out a port. + if _, err := strconv.Atoi(port); err != nil { + // Do nothing - port is correct as is. + } else { + // Try a name lookup, even if name is "". + for i := range ep.Ports { + if ep.Ports[i].Name == port { + port = strconv.Itoa(ep.Ports[i].Port) + break + } + } + } + if port == "" { + // Still nothing - try the first defined port. + if len(ep.Ports) > 0 { + port = strconv.Itoa(ep.Ports[0].Port) + } + } + // We leave off the scheme ('http://') because we have no idea what sort of server // is listening at this endpoint. - ep := &eps.Endpoints[rand.Intn(len(eps.Endpoints))] - return net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port)), nil + loc := ep.IP + if port != "" { + loc += fmt.Sprintf(":%s", port) + } + return loc, nil } func (rs *REST) createExternalLoadBalancer(ctx api.Context, service *api.Service) error { diff --git a/pkg/registry/service/rest_test.go b/pkg/registry/service/rest_test.go index 8426ef4134a..e001abbb23f 100644 --- a/pkg/registry/service/rest_test.go +++ b/pkg/registry/service/rest_test.go @@ -370,7 +370,7 @@ func TestServiceRegistryGet(t *testing.T) { func TestServiceRegistryResourceLocation(t *testing.T) { ctx := api.NewDefaultContext() registry := registrytest.NewServiceRegistry() - registry.Endpoints = api.Endpoints{Endpoints: []api.Endpoint{{IP: "foo", Port: 80}}} + registry.Endpoints = api.Endpoints{Endpoints: []api.Endpoint{{IP: "foo", Ports: []api.EndpointPort{{Port: 80}}}}} fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t)) diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 7cd296e1592..e0c652a76f2 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -60,9 +60,9 @@ func (st *schedulerTester) expectFailure(pod api.Pod) { } func newPod(host string, hostPorts ...int) api.Pod { - networkPorts := []api.Port{} + networkPorts := []api.ContainerPort{} for _, port := range hostPorts { - networkPorts = append(networkPorts, api.Port{HostPort: port}) + networkPorts = append(networkPorts, api.ContainerPort{HostPort: port}) } return api.Pod{ Status: api.PodStatus{ diff --git a/pkg/service/endpoints_controller.go b/pkg/service/endpoints_controller.go index 3f42a9d0d8b..870880db3be 100644 --- a/pkg/service/endpoints_controller.go +++ b/pkg/service/endpoints_controller.go @@ -87,7 +87,11 @@ func (e *EndpointController) SyncServiceEndpoints() error { continue } - endpoints = append(endpoints, api.Endpoint{IP: pod.Status.PodIP, Port: port}) + // TODO: Add multiple-ports to Service and expose them here. + endpoints = append(endpoints, api.Endpoint{ + IP: pod.Status.PodIP, + Ports: []api.EndpointPort{{Name: "", Protocol: service.Spec.Protocol, Port: port}}, + }) } currentEndpoints, err := e.client.Endpoints(service.Namespace).Get(service.Name) if err != nil { @@ -96,7 +100,6 @@ func (e *EndpointController) SyncServiceEndpoints() error { ObjectMeta: api.ObjectMeta{ Name: service.Name, }, - Protocol: service.Spec.Protocol, } } else { glog.Errorf("Error getting endpoints: %v", err) @@ -112,7 +115,7 @@ func (e *EndpointController) SyncServiceEndpoints() error { _, err = e.client.Endpoints(service.Namespace).Create(newEndpoints) } else { // Pre-existing - if currentEndpoints.Protocol == service.Spec.Protocol && endpointsEqual(currentEndpoints, endpoints) { + if endpointsEqual(currentEndpoints, endpoints) { glog.V(5).Infof("protocol and endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name) continue } @@ -126,12 +129,27 @@ func (e *EndpointController) SyncServiceEndpoints() error { return resultErr } +// TODO: It would be nice if we had a util function that reflectively compared +// two slices for order-insensitive equivalence. +func portsEqual(lhs, rhs []api.EndpointPort) bool { + if len(lhs) != len(rhs) { + return false + } + for i := range lhs { + if lhs[i] != rhs[i] { + return false + } + } + return true +} + func containsEndpoint(haystack *api.Endpoints, needle *api.Endpoint) bool { if haystack == nil || needle == nil { return false } for ix := range haystack.Endpoints { - if haystack.Endpoints[ix] == *needle { + haystackEP := &haystack.Endpoints[ix] + if haystackEP.IP == needle.IP && portsEqual(haystackEP.Ports, needle.Ports) { return true } } diff --git a/pkg/service/endpoints_controller_test.go b/pkg/service/endpoints_controller_test.go index 9476f8df5ff..72336b53d88 100644 --- a/pkg/service/endpoints_controller_test.go +++ b/pkg/service/endpoints_controller_test.go @@ -39,7 +39,7 @@ func newPodList(count int) *api.PodList { Spec: api.PodSpec{ Containers: []api.Container{ { - Ports: []api.Port{ + Ports: []api.ContainerPort{ { ContainerPort: 8080, }, @@ -69,7 +69,7 @@ func TestFindPort(t *testing.T) { Spec: api.PodSpec{ Containers: []api.Container{ { - Ports: []api.Port{ + Ports: []api.ContainerPort{ { Name: "foo", ContainerPort: 8080, @@ -90,7 +90,7 @@ func TestFindPort(t *testing.T) { Spec: api.PodSpec{ Containers: []api.Container{ { - Ports: []api.Port{}, + Ports: []api.ContainerPort{}, }, }, }, @@ -245,8 +245,7 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { Name: "foo", ResourceVersion: "1", }, - Protocol: api.ProtocolTCP, - Endpoints: []api.Endpoint{{IP: "6.7.8.9", Port: 1000}}, + Endpoints: []api.Endpoint{{IP: "6.7.8.9", Ports: []api.EndpointPort{{Protocol: api.ProtocolTCP, Port: 1000}}}}, }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) @@ -277,8 +276,7 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) { Name: "foo", ResourceVersion: "1", }, - Protocol: api.ProtocolTCP, - Endpoints: []api.Endpoint{{IP: "6.7.8.9", Port: 1000}}, + Endpoints: []api.Endpoint{{IP: "6.7.8.9", Ports: []api.EndpointPort{{Protocol: api.ProtocolTCP, Port: 1000}}}}, }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) @@ -309,8 +307,7 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) { Name: "foo", ResourceVersion: "1", }, - Protocol: api.ProtocolUDP, - Endpoints: []api.Endpoint{{IP: "6.7.8.9", Port: 1000}}, + Endpoints: []api.Endpoint{{IP: "6.7.8.9", Ports: []api.EndpointPort{{Protocol: api.ProtocolUDP, Port: 1000}}}}, }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) @@ -340,7 +337,6 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { Name: "foo", ResourceVersion: "1", }, - Protocol: api.ProtocolTCP, Endpoints: []api.Endpoint{}, }}) defer testServer.Close() @@ -354,8 +350,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { Name: "foo", ResourceVersion: "1", }, - Protocol: api.ProtocolTCP, - Endpoints: []api.Endpoint{{IP: "1.2.3.4", Port: 8080}}, + Endpoints: []api.Endpoint{{IP: "1.2.3.4", Ports: []api.EndpointPort{{Protocol: api.ProtocolTCP, Port: 8080}}}}, }) endpointsHandler.ValidateRequest(t, "/api/"+testapi.Version()+"/endpoints/foo?namespace=other", "PUT", &data) } @@ -381,8 +376,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { Name: "foo", ResourceVersion: "1", }, - Protocol: api.ProtocolTCP, - Endpoints: []api.Endpoint{{IP: "6.7.8.9", Port: 1000}}, + Endpoints: []api.Endpoint{{IP: "6.7.8.9", Ports: []api.EndpointPort{{Protocol: api.ProtocolTCP, Port: 1000}}}}, }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) @@ -395,8 +389,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { Name: "foo", ResourceVersion: "1", }, - Protocol: api.ProtocolTCP, - Endpoints: []api.Endpoint{{IP: "1.2.3.4", Port: 8080}}, + Endpoints: []api.Endpoint{{IP: "1.2.3.4", Ports: []api.EndpointPort{{Protocol: api.ProtocolTCP, Port: 8080}}}}, }) endpointsHandler.ValidateRequest(t, "/api/"+testapi.Version()+"/endpoints/foo?namespace=bar", "PUT", &data) } @@ -421,8 +414,7 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) { ObjectMeta: api.ObjectMeta{ ResourceVersion: "1", }, - Protocol: api.ProtocolTCP, - Endpoints: []api.Endpoint{{IP: "1.2.3.4", Port: 8080}}, + Endpoints: []api.Endpoint{{IP: "1.2.3.4", Ports: []api.EndpointPort{{Protocol: api.ProtocolTCP, Port: 8080}}}}, }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) @@ -460,8 +452,7 @@ func TestSyncEndpointsItems(t *testing.T) { ObjectMeta: api.ObjectMeta{ ResourceVersion: "", }, - Protocol: api.ProtocolTCP, - Endpoints: []api.Endpoint{{IP: "1.2.3.4", Port: 8080}}, + Endpoints: []api.Endpoint{{IP: "1.2.3.4", Ports: []api.EndpointPort{{Protocol: api.ProtocolTCP, Port: 8080}}}}, }) endpointsHandler.ValidateRequest(t, "/api/"+testapi.Version()+"/endpoints?namespace=other", "POST", &data) } diff --git a/pkg/tools/etcd_tools_watch_test.go b/pkg/tools/etcd_tools_watch_test.go index 9685898d6bb..f9f09587836 100644 --- a/pkg/tools/etcd_tools_watch_test.go +++ b/pkg/tools/etcd_tools_watch_test.go @@ -308,7 +308,7 @@ func TestWatchEtcdState(t *testing.T) { { Action: "compareAndSwap", Node: &etcd.Node{ - Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}})), + Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: 9000}}}}})), CreatedIndex: 1, ModifiedIndex: 2, }, @@ -321,7 +321,7 @@ func TestWatchEtcdState(t *testing.T) { }, From: 1, Expected: []*T{ - {watch.Modified, []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}}, + {watch.Modified, []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: 9000}}}}}, }, }, "from initial state": { @@ -343,7 +343,7 @@ func TestWatchEtcdState(t *testing.T) { { Action: "compareAndSwap", Node: &etcd.Node{ - Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}})), + Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: 9000}}}}})), CreatedIndex: 1, ModifiedIndex: 2, }, @@ -356,7 +356,7 @@ func TestWatchEtcdState(t *testing.T) { }, Expected: []*T{ {watch.Added, nil}, - {watch.Modified, []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}}, + {watch.Modified, []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: 9000}}}}}, }, }, } diff --git a/test/e2e/events.go b/test/e2e/events.go index 0f7f896f2ea..90f0de3a6b8 100644 --- a/test/e2e/events.go +++ b/test/e2e/events.go @@ -64,7 +64,7 @@ var _ = Describe("Events", func() { { Name: "p", Image: "kubernetes/serve_hostname", - Ports: []api.Port{{ContainerPort: 80}}, + Ports: []api.ContainerPort{{ContainerPort: 80}}, }, }, }, diff --git a/test/e2e/networking.go b/test/e2e/networking.go index 367d7300af4..34de4720cb9 100644 --- a/test/e2e/networking.go +++ b/test/e2e/networking.go @@ -109,7 +109,7 @@ var _ = Describe("Networking", func() { Name: "webserver", Image: "kubernetes/nettest:latest", Command: []string{"-service=" + name}, - Ports: []api.Port{{ContainerPort: 8080}}, + Ports: []api.ContainerPort{{ContainerPort: 8080}}, }, }, }, diff --git a/test/e2e/pods.go b/test/e2e/pods.go index 717e3e87929..89e8f8f3cf2 100644 --- a/test/e2e/pods.go +++ b/test/e2e/pods.go @@ -108,7 +108,7 @@ var _ = Describe("Pods", func() { { Name: "nginx", Image: "dockerfile/nginx", - Ports: []api.Port{{ContainerPort: 80}}, + Ports: []api.ContainerPort{{ContainerPort: 80}}, LivenessProbe: &api.Probe{ Handler: api.Handler{ HTTPGet: &api.HTTPGetAction{ @@ -165,7 +165,7 @@ var _ = Describe("Pods", func() { { Name: "nginx", Image: "dockerfile/nginx", - Ports: []api.Port{{ContainerPort: 80}}, + Ports: []api.ContainerPort{{ContainerPort: 80}}, LivenessProbe: &api.Probe{ Handler: api.Handler{ HTTPGet: &api.HTTPGetAction{ @@ -236,7 +236,7 @@ var _ = Describe("Pods", func() { { Name: "srv", Image: "kubernetes/serve_hostname", - Ports: []api.Port{{ContainerPort: 9376}}, + Ports: []api.ContainerPort{{ContainerPort: 9376}}, }, }, }, diff --git a/test/e2e/rc.go b/test/e2e/rc.go index cb9e8a3a61c..d0da049edbc 100644 --- a/test/e2e/rc.go +++ b/test/e2e/rc.go @@ -86,7 +86,7 @@ func ServeImageOrFail(c *client.Client, test string, image string) { { Name: name, Image: image, - Ports: []api.Port{{ContainerPort: 9376, HostPort: 8080}}, + Ports: []api.ContainerPort{{ContainerPort: 9376, HostPort: 8080}}, }, }, }, diff --git a/test/e2e/service.go b/test/e2e/service.go index 7a9ed995947..878174c0615 100644 --- a/test/e2e/service.go +++ b/test/e2e/service.go @@ -250,8 +250,11 @@ var _ = Describe("Services", func() { func validateIPsOrFail(c *client.Client, ns string, expectedPort int, expectedEndpoints []string, endpoints *api.Endpoints) { ips := util.StringSet{} for _, ep := range endpoints.Endpoints { - if ep.Port != expectedPort { - Fail(fmt.Sprintf("invalid port, expected %d, got %d", expectedPort, ep.Port)) + if len(ep.Ports) == 0 { + Fail(fmt.Sprintf("invalid endpoint, no ports")) + } + if ep.Ports[0].Port != expectedPort { + Fail(fmt.Sprintf("invalid port, expected %d, got %d", expectedPort, ep.Ports[0].Port)) } ips.Insert(ep.IP) } @@ -296,7 +299,7 @@ func addEndpointPodOrFail(c *client.Client, ns, name string, labels map[string]s { Name: "test", Image: "kubernetes/pause", - Ports: []api.Port{{ContainerPort: 80}}, + Ports: []api.ContainerPort{{ContainerPort: 80}}, }, }, },