diff --git a/pkg/api/testing/fuzzer.go b/pkg/api/testing/fuzzer.go index bdd6b46d308..ed4b1a0e15f 100644 --- a/pkg/api/testing/fuzzer.go +++ b/pkg/api/testing/fuzzer.go @@ -17,6 +17,7 @@ limitations under the License. package testing import ( + "fmt" "math/rand" "strconv" "testing" @@ -233,6 +234,11 @@ func FuzzerFor(t *testing.T, version string, src rand.Source) *fuzz.Fuzzer { s.Type = api.SecretTypeOpaque }, + func(ep *api.Endpoint, c fuzz.Continue) { + // TODO: If our API used a particular type for IP fields we could just catch that here. + ep.IP = fmt.Sprintf("%d.%d.%d.%d", c.Rand.Intn(256), c.Rand.Intn(256), c.Rand.Intn(256), c.Rand.Intn(256)) + ep.Port = c.Rand.Intn(65536) + }, ) return f } diff --git a/pkg/api/types.go b/pkg/api/types.go index 6ccf553933f..b4a7b5f609b 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -745,12 +745,25 @@ type Service struct { } // Endpoints is a collection of endpoints that implement the actual service, for example: -// Name: "mysql", Endpoints: ["10.10.1.1:1909", "10.10.2.2:8834"] +// Name: "mysql", Endpoints: [{"ip": "10.10.1.1", "port": 1909}, {"ip": "10.10.2.2", "port": 8834}] type Endpoints struct { TypeMeta `json:",inline"` ObjectMeta `json:"metadata,omitempty"` - Endpoints []string `json:"endpoints,omitempty"` + // Optional: The IP protocol for these endpoints. Supports "TCP" and + // "UDP". Defaults to "TCP". + Protocol Protocol `json:"protocol,omitempty"` + Endpoints []Endpoint `json:"endpoints,omitempty"` +} + +// Endpoint is a single IP endpoint of a service. +type Endpoint struct { + // Required: The IP of this endpoint. + // TODO: This should allow hostname or IP, see #4447. + IP string `json:"ip"` + + // Required: The destination port to access. + Port int `json:"port"` } // EndpointsList is a list of endpoints. diff --git a/pkg/api/v1beta1/conversion.go b/pkg/api/v1beta1/conversion.go index cc69d0293f8..2f178b4de8e 100644 --- a/pkg/api/v1beta1/conversion.go +++ b/pkg/api/v1beta1/conversion.go @@ -18,6 +18,7 @@ package v1beta1 import ( "fmt" + "net" "strconv" newer "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -1113,6 +1114,48 @@ func init() { out.TimeoutSeconds = in.TimeoutSeconds return nil }, + func(in *newer.Endpoints, out *Endpoints, s conversion.Scope) error { + if err := s.Convert(&in.TypeMeta, &out.TypeMeta, 0); err != nil { + return err + } + if err := s.Convert(&in.ObjectMeta, &out.TypeMeta, 0); err != nil { + return err + } + if err := s.Convert(&in.Protocol, &out.Protocol, 0); err != nil { + return err + } + for i := range in.Endpoints { + ep := &in.Endpoints[i] + out.Endpoints = append(out.Endpoints, net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port))) + } + return nil + }, + func(in *Endpoints, out *newer.Endpoints, s conversion.Scope) error { + if err := s.Convert(&in.TypeMeta, &out.TypeMeta, 0); err != nil { + return err + } + if err := s.Convert(&in.TypeMeta, &out.ObjectMeta, 0); err != nil { + return err + } + if err := s.Convert(&in.Protocol, &out.Protocol, 0); err != nil { + return err + } + for i := range in.Endpoints { + out.Endpoints = append(out.Endpoints, newer.Endpoint{}) + ep := &out.Endpoints[i] + host, port, err := net.SplitHostPort(in.Endpoints[i]) + if err != nil { + return err + } + ep.IP = host + pn, err := strconv.Atoi(port) + if err != nil { + return err + } + ep.Port = pn + } + return nil + }, ) if err != nil { // If one of the conversion functions is malformed, detect it immediately. diff --git a/pkg/api/v1beta1/conversion_test.go b/pkg/api/v1beta1/conversion_test.go index fd6a6df0455..df0009b4198 100644 --- a/pkg/api/v1beta1/conversion_test.go +++ b/pkg/api/v1beta1/conversion_test.go @@ -382,3 +382,72 @@ func TestContainerConversion(t *testing.T) { } } } + +func TestEndpointsConversion(t *testing.T) { + testCases := []struct { + given current.Endpoints + expected newer.Endpoints + }{ + { + given: current.Endpoints{ + TypeMeta: current.TypeMeta{ + ID: "empty", + }, + Protocol: current.ProtocolTCP, + Endpoints: []string{}, + }, + expected: newer.Endpoints{ + Protocol: newer.ProtocolTCP, + Endpoints: []newer.Endpoint{}, + }, + }, + { + given: current.Endpoints{ + TypeMeta: current.TypeMeta{ + ID: "one", + }, + Protocol: current.ProtocolTCP, + Endpoints: []string{"1.2.3.4:88"}, + }, + expected: newer.Endpoints{ + Protocol: newer.ProtocolTCP, + Endpoints: []newer.Endpoint{{IP: "1.2.3.4", Port: 88}}, + }, + }, + { + given: current.Endpoints{ + TypeMeta: current.TypeMeta{ + ID: "several", + }, + Protocol: current.ProtocolUDP, + Endpoints: []string{"1.2.3.4:88", "1.2.3.4:89", "1.2.3.4:90"}, + }, + expected: newer.Endpoints{ + Protocol: newer.ProtocolUDP, + Endpoints: []newer.Endpoint{{IP: "1.2.3.4", Port: 88}, {IP: "1.2.3.4", Port: 89}, {IP: "1.2.3.4", Port: 90}}, + }, + }, + } + + for i, tc := range testCases { + // Convert versioned -> internal. + got := newer.Endpoints{} + if err := Convert(&tc.given, &got); err != nil { + t.Errorf("[Case: %d] Unexpected error: %v", i, err) + continue + } + if got.Protocol != tc.expected.Protocol || !newer.Semantic.DeepEqual(got.Endpoints, tc.expected.Endpoints) { + t.Errorf("[Case: %d] Expected %v, got %v", i, tc.expected, got) + } + + // Convert internal -> versioned. + got2 := current.Endpoints{} + if err := Convert(&got, &got2); err != nil { + t.Errorf("[Case: %d] Unexpected error: %v", i, err) + continue + } + if got2.Protocol != tc.given.Protocol || !newer.Semantic.DeepEqual(got2.Endpoints, tc.given.Endpoints) { + t.Errorf("[Case: %d] Expected %v, got %v", i, tc.given, got2) + } + } +} diff --git a/pkg/api/v1beta1/defaults.go b/pkg/api/v1beta1/defaults.go index 713458902c2..c833f9ffb60 100644 --- a/pkg/api/v1beta1/defaults.go +++ b/pkg/api/v1beta1/defaults.go @@ -85,5 +85,10 @@ func init() { obj.Type = SecretTypeOpaque } }, + func(obj *Endpoints) { + if obj.Protocol == "" { + obj.Protocol = "TCP" + } + }, ) } diff --git a/pkg/api/v1beta1/defaults_test.go b/pkg/api/v1beta1/defaults_test.go index bef10ab9016..d7372045247 100644 --- a/pkg/api/v1beta1/defaults_test.go +++ b/pkg/api/v1beta1/defaults_test.go @@ -102,3 +102,13 @@ func TestSetDefaultSecret(t *testing.T) { t.Errorf("Expected secret type %v, got %v", current.SecretTypeOpaque, s2.Type) } } + +func TestSetDefaulEndpointsProtocol(t *testing.T) { + in := ¤t.Endpoints{} + obj := roundTrip(t, runtime.Object(in)) + out := obj.(*current.Endpoints) + + if out.Protocol != current.ProtocolTCP { + t.Errorf("Expected protocol %s, got %s", current.ProtocolTCP, out.Protocol) + } +} diff --git a/pkg/api/v1beta1/types.go b/pkg/api/v1beta1/types.go index ec75dc6c110..929ec181dd9 100644 --- a/pkg/api/v1beta1/types.go +++ b/pkg/api/v1beta1/types.go @@ -604,7 +604,10 @@ type Service struct { // Endpoints is a collection of endpoints that implement the actual service, for example: // Name: "mysql", Endpoints: ["10.10.1.1:1909", "10.10.2.2:8834"] type Endpoints struct { - TypeMeta `json:",inline"` + TypeMeta `json:",inline"` + // Optional: The IP protocol for these endpoints. Supports "TCP" and + // "UDP". Defaults to "TCP". + Protocol Protocol `json:"protocol,omitempty" description:"IP protocol for endpoint ports; must be UDP or TCP; TCP if unspecified"` Endpoints []string `json:"endpoints,omitempty" description:"list of endpoints corresponding to a service, of the form address:port, such as 10.10.1.1:1909"` } diff --git a/pkg/api/v1beta2/conversion.go b/pkg/api/v1beta2/conversion.go index df63f91abe3..9c2d73058bd 100644 --- a/pkg/api/v1beta2/conversion.go +++ b/pkg/api/v1beta2/conversion.go @@ -18,6 +18,7 @@ package v1beta2 import ( "fmt" + "net" "strconv" newer "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -1028,6 +1029,48 @@ func init() { out.TimeoutSeconds = in.TimeoutSeconds return nil }, + func(in *newer.Endpoints, out *Endpoints, s conversion.Scope) error { + if err := s.Convert(&in.TypeMeta, &out.TypeMeta, 0); err != nil { + return err + } + if err := s.Convert(&in.ObjectMeta, &out.TypeMeta, 0); err != nil { + return err + } + if err := s.Convert(&in.Protocol, &out.Protocol, 0); err != nil { + return err + } + for i := range in.Endpoints { + ep := &in.Endpoints[i] + out.Endpoints = append(out.Endpoints, net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port))) + } + return nil + }, + func(in *Endpoints, out *newer.Endpoints, s conversion.Scope) error { + if err := s.Convert(&in.TypeMeta, &out.TypeMeta, 0); err != nil { + return err + } + if err := s.Convert(&in.TypeMeta, &out.ObjectMeta, 0); err != nil { + return err + } + if err := s.Convert(&in.Protocol, &out.Protocol, 0); err != nil { + return err + } + for i := range in.Endpoints { + out.Endpoints = append(out.Endpoints, newer.Endpoint{}) + ep := &out.Endpoints[i] + host, port, err := net.SplitHostPort(in.Endpoints[i]) + if err != nil { + return err + } + ep.IP = host + pn, err := strconv.Atoi(port) + if err != nil { + return err + } + ep.Port = pn + } + return nil + }, ) if err != nil { // If one of the conversion functions is malformed, detect it immediately. diff --git a/pkg/api/v1beta2/conversion_test.go b/pkg/api/v1beta2/conversion_test.go index 2b48cf5eda1..b606476466d 100644 --- a/pkg/api/v1beta2/conversion_test.go +++ b/pkg/api/v1beta2/conversion_test.go @@ -212,3 +212,72 @@ func TestContainerConversion(t *testing.T) { } } } + +func TestEndpointsConversion(t *testing.T) { + testCases := []struct { + given current.Endpoints + expected newer.Endpoints + }{ + { + given: current.Endpoints{ + TypeMeta: current.TypeMeta{ + ID: "empty", + }, + Protocol: current.ProtocolTCP, + Endpoints: []string{}, + }, + expected: newer.Endpoints{ + Protocol: newer.ProtocolTCP, + Endpoints: []newer.Endpoint{}, + }, + }, + { + given: current.Endpoints{ + TypeMeta: current.TypeMeta{ + ID: "one", + }, + Protocol: current.ProtocolTCP, + Endpoints: []string{"1.2.3.4:88"}, + }, + expected: newer.Endpoints{ + Protocol: newer.ProtocolTCP, + Endpoints: []newer.Endpoint{{IP: "1.2.3.4", Port: 88}}, + }, + }, + { + given: current.Endpoints{ + TypeMeta: current.TypeMeta{ + ID: "several", + }, + Protocol: current.ProtocolUDP, + Endpoints: []string{"1.2.3.4:88", "1.2.3.4:89", "1.2.3.4:90"}, + }, + expected: newer.Endpoints{ + Protocol: newer.ProtocolUDP, + Endpoints: []newer.Endpoint{{IP: "1.2.3.4", Port: 88}, {IP: "1.2.3.4", Port: 89}, {IP: "1.2.3.4", Port: 90}}, + }, + }, + } + + for i, tc := range testCases { + // Convert versioned -> internal. + got := newer.Endpoints{} + if err := newer.Scheme.Convert(&tc.given, &got); err != nil { + t.Errorf("[Case: %d] Unexpected error: %v", i, err) + continue + } + if got.Protocol != tc.expected.Protocol || !newer.Semantic.DeepEqual(got.Endpoints, tc.expected.Endpoints) { + t.Errorf("[Case: %d] Expected %v, got %v", i, tc.expected, got) + } + + // Convert internal -> versioned. + got2 := current.Endpoints{} + if err := newer.Scheme.Convert(&got, &got2); err != nil { + t.Errorf("[Case: %d] Unexpected error: %v", i, err) + continue + } + if got2.Protocol != tc.given.Protocol || !newer.Semantic.DeepEqual(got2.Endpoints, tc.given.Endpoints) { + t.Errorf("[Case: %d] Expected %v, got %v", i, tc.given, got2) + } + } +} diff --git a/pkg/api/v1beta2/defaults.go b/pkg/api/v1beta2/defaults.go index ef0abf4100a..9242bf3a323 100644 --- a/pkg/api/v1beta2/defaults.go +++ b/pkg/api/v1beta2/defaults.go @@ -87,5 +87,10 @@ func init() { obj.Type = SecretTypeOpaque } }, + func(obj *Endpoints) { + if obj.Protocol == "" { + obj.Protocol = "TCP" + } + }, ) } diff --git a/pkg/api/v1beta2/defaults_test.go b/pkg/api/v1beta2/defaults_test.go index fb14920244d..a73c83a93e6 100644 --- a/pkg/api/v1beta2/defaults_test.go +++ b/pkg/api/v1beta2/defaults_test.go @@ -102,3 +102,13 @@ func TestSetDefaultSecret(t *testing.T) { t.Errorf("Expected secret type %v, got %v", current.SecretTypeOpaque, s2.Type) } } + +func TestSetDefaulEndpointsProtocol(t *testing.T) { + in := ¤t.Endpoints{} + obj := roundTrip(t, runtime.Object(in)) + out := obj.(*current.Endpoints) + + if out.Protocol != current.ProtocolTCP { + t.Errorf("Expected protocol %s, got %s", current.ProtocolTCP, out.Protocol) + } +} diff --git a/pkg/api/v1beta2/types.go b/pkg/api/v1beta2/types.go index 8e6793be052..314aa4f9d51 100644 --- a/pkg/api/v1beta2/types.go +++ b/pkg/api/v1beta2/types.go @@ -568,7 +568,10 @@ type Service struct { // Endpoints is a collection of endpoints that implement the actual service, for example: // Name: "mysql", Endpoints: ["10.10.1.1:1909", "10.10.2.2:8834"] type Endpoints struct { - TypeMeta `json:",inline"` + TypeMeta `json:",inline"` + // Optional: The IP protocol for these endpoints. Supports "TCP" and + // "UDP". Defaults to "TCP". + Protocol Protocol `json:"protocol,omitempty" description:"IP protocol for endpoint ports; must be UDP or TCP; TCP if unspecified"` Endpoints []string `json:"endpoints,omitempty" description:"list of endpoints corresponding to a service, of the form address:port, such as 10.10.1.1:1909"` } diff --git a/pkg/api/v1beta3/defaults.go b/pkg/api/v1beta3/defaults.go index 4abfa2c8178..3149601fb64 100644 --- a/pkg/api/v1beta3/defaults.go +++ b/pkg/api/v1beta3/defaults.go @@ -80,5 +80,10 @@ func init() { obj.Type = SecretTypeOpaque } }, + func(obj *Endpoints) { + if obj.Protocol == "" { + obj.Protocol = "TCP" + } + }, ) } diff --git a/pkg/api/v1beta3/defaults_test.go b/pkg/api/v1beta3/defaults_test.go index 370e3f574c4..e1ebf388b1e 100644 --- a/pkg/api/v1beta3/defaults_test.go +++ b/pkg/api/v1beta3/defaults_test.go @@ -102,3 +102,13 @@ func TestSetDefaultSecret(t *testing.T) { t.Errorf("Expected secret type %v, got %v", current.SecretTypeOpaque, s2.Type) } } + +func TestSetDefaulEndpointsProtocol(t *testing.T) { + in := ¤t.Endpoints{} + obj := roundTrip(t, runtime.Object(in)) + out := obj.(*current.Endpoints) + + if out.Protocol != current.ProtocolTCP { + t.Errorf("Expected protocol %s, got %s", current.ProtocolTCP, out.Protocol) + } +} diff --git a/pkg/api/v1beta3/types.go b/pkg/api/v1beta3/types.go index 5293ee5c6a0..45408842681 100644 --- a/pkg/api/v1beta3/types.go +++ b/pkg/api/v1beta3/types.go @@ -777,13 +777,26 @@ type ServiceList struct { } // Endpoints is a collection of endpoints that implement the actual service, for example: -// Name: "mysql", Endpoints: ["10.10.1.1:1909", "10.10.2.2:8834"] +// Name: "mysql", Endpoints: [{"ip": "10.10.1.1", "port": 1909}, {"ip": "10.10.2.2", "port": 8834}] type Endpoints struct { TypeMeta `json:",inline"` - ObjectMeta `json:"metadata"` + ObjectMeta `json:"metadata,omitempty"` - // Endpoints is the list of host ports that satisfy the service selector - Endpoints []string `json:"endpoints"` + // Optional: The IP protocol for these endpoints. Supports "TCP" and + // "UDP". Defaults to "TCP". + Protocol Protocol `json:"protocol,omitempty"` + + Endpoints []Endpoint `json:"endpoints,omitempty"` +} + +// Endpoint is a single IP endpoint of a service. +type Endpoint struct { + // Required: The IP of this endpoint. + // TODO: This should allow hostname or IP, see #4447. + IP string `json:"ip"` + + // Required: The destination port to access. + Port int `json:"port"` } // EndpointsList is a list of endpoints. diff --git a/pkg/client/client_test.go b/pkg/client/client_test.go index 2ddb20fab02..096b887dbcf 100644 --- a/pkg/client/client_test.go +++ b/pkg/client/client_test.go @@ -615,7 +615,8 @@ func TestListEndpooints(t *testing.T) { Items: []api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "endpoint-1"}, - Endpoints: []string{"10.245.1.2:8080", "10.245.1.3:8080"}, + Endpoints: []api.Endpoint{ + {IP: "10.245.1.2", Port: 8080}, {IP: "10.245.1.3", Port: 8080}}, }, }, }, diff --git a/pkg/kubectl/describe.go b/pkg/kubectl/describe.go index 2c62ab9d42c..8e1613b3ddc 100644 --- a/pkg/kubectl/describe.go +++ b/pkg/kubectl/describe.go @@ -269,7 +269,7 @@ func (d *ServiceDescriber) Describe(namespace, name string) (string, error) { fmt.Fprintf(out, "Labels:\t%s\n", formatLabels(service.Labels)) fmt.Fprintf(out, "Selector:\t%s\n", formatLabels(service.Spec.Selector)) fmt.Fprintf(out, "Port:\t%d\n", service.Spec.Port) - fmt.Fprintf(out, "Endpoints:\t%s\n", stringList(endpoints.Endpoints)) + fmt.Fprintf(out, "Endpoints:\t%s\n", formatEndpoints(endpoints.Endpoints)) if events != nil { describeEvents(events, out) } diff --git a/pkg/kubectl/resource_printer.go b/pkg/kubectl/resource_printer.go index 652103f0313..330ebe721fc 100644 --- a/pkg/kubectl/resource_printer.go +++ b/pkg/kubectl/resource_printer.go @@ -22,8 +22,10 @@ import ( "fmt" "io" "io/ioutil" + "net" "reflect" "sort" + "strconv" "strings" "text/tabwriter" "text/template" @@ -263,10 +265,15 @@ func (h *HumanReadablePrinter) printHeader(columnNames []string, w io.Writer) er return nil } -func stringList(list []string) string { - if len(list) == 0 { +func formatEndpoints(endpoints []api.Endpoint) string { + if len(endpoints) == 0 { return "" } + list := []string{} + for i := range endpoints { + ep := &endpoints[i] + list = append(list, net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port))) + } return strings.Join(list, ",") } @@ -368,7 +375,7 @@ func printServiceList(list *api.ServiceList, w io.Writer) error { } func printEndpoints(endpoint *api.Endpoints, w io.Writer) error { - _, err := fmt.Fprintf(w, "%s\t%s\n", endpoint.Name, stringList(endpoint.Endpoints)) + _, err := fmt.Fprintf(w, "%s\t%s\n", endpoint.Name, formatEndpoints(endpoint.Endpoints)) return err } diff --git a/pkg/kubectl/resource_printer_test.go b/pkg/kubectl/resource_printer_test.go index f1a57f4747f..cc6f7ab6b6b 100644 --- a/pkg/kubectl/resource_printer_test.go +++ b/pkg/kubectl/resource_printer_test.go @@ -452,7 +452,7 @@ func TestPrinters(t *testing.T) { "pod": &api.Pod{ObjectMeta: om("pod")}, "emptyPodList": &api.PodList{}, "nonEmptyPodList": &api.PodList{Items: []api.Pod{{}}}, - "endpoints": &api.Endpoints{Endpoints: []string{"127.0.0.1", "localhost:8080"}}, + "endpoints": &api.Endpoints{Endpoints: []api.Endpoint{{IP: "127.0.0.1"}, {IP: "localhost", Port: 8080}}}, } // map of printer name to set of objects it should fail on. expectedErrors := map[string]util.StringSet{ diff --git a/pkg/master/publish.go b/pkg/master/publish.go index ced152954b9..7cc73559965 100644 --- a/pkg/master/publish.go +++ b/pkg/master/publish.go @@ -18,7 +18,6 @@ package master import ( "net" - "strconv" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -40,7 +39,7 @@ func (m *Master) serviceWriterLoop(stop chan struct{}) { if err := m.createMasterServiceIfNeeded("kubernetes", m.serviceReadWriteIP, m.serviceReadWritePort); err != nil { glog.Errorf("Can't create rw service: %v", err) } - if err := m.ensureEndpointsContain("kubernetes", net.JoinHostPort(m.publicIP.String(), strconv.Itoa(int(m.publicReadWritePort)))); err != nil { + if err := m.ensureEndpointsContain("kubernetes", m.publicIP, m.publicReadWritePort); err != nil { glog.Errorf("Can't create rw endpoints: %v", err) } } @@ -65,7 +64,7 @@ func (m *Master) roServiceWriterLoop(stop chan struct{}) { if err := m.createMasterServiceIfNeeded("kubernetes-ro", m.serviceReadOnlyIP, m.serviceReadOnlyPort); err != nil { glog.Errorf("Can't create ro service: %v", err) } - if err := m.ensureEndpointsContain("kubernetes-ro", net.JoinHostPort(m.publicIP.String(), strconv.Itoa(int(m.publicReadOnlyPort)))); err != nil { + if err := m.ensureEndpointsContain("kubernetes-ro", m.publicIP, m.publicReadOnlyPort); err != nil { glog.Errorf("Can't create ro endpoints: %v", err) } } @@ -126,26 +125,28 @@ func (m *Master) createMasterServiceIfNeeded(serviceName string, serviceIP net.I // excess endpoints (as determined by m.masterCount). Extra endpoints could appear // in the list if, for example, the master starts running on a different machine, // changing IP addresses. -func (m *Master) ensureEndpointsContain(serviceName string, endpoint string) error { +func (m *Master) ensureEndpointsContain(serviceName string, ip net.IP, port int) error { ctx := api.NewDefaultContext() e, err := m.endpointRegistry.GetEndpoints(ctx, serviceName) - if err != nil { + if err != nil || e.Protocol != api.ProtocolTCP { e = &api.Endpoints{ ObjectMeta: api.ObjectMeta{ Name: serviceName, Namespace: api.NamespaceDefault, }, + Protocol: api.ProtocolTCP, } } found := false for i := range e.Endpoints { - if e.Endpoints[i] == endpoint { + ep := &e.Endpoints[i] + if ep.IP == ip.String() && ep.Port == port { found = true break } } if !found { - e.Endpoints = append(e.Endpoints, endpoint) + e.Endpoints = append(e.Endpoints, api.Endpoint{IP: ip.String(), Port: port}) } if len(e.Endpoints) > m.masterCount { // We append to the end and remove from the beginning, so this should diff --git a/pkg/proxy/config/api_test.go b/pkg/proxy/config/api_test.go index 11d34e47e40..29489f839f3 100644 --- a/pkg/proxy/config/api_test.go +++ b/pkg/proxy/config/api_test.go @@ -183,7 +183,10 @@ func TestServicesFromZeroError(t *testing.T) { } func TestEndpoints(t *testing.T) { - endpoint := api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "2"}, Endpoints: []string{"127.0.0.1:9000"}} + endpoint := api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "2"}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}, + } fakeWatch := watch.NewFake() fakeClient := &client.Fake{Watch: fakeWatch} @@ -230,7 +233,10 @@ func TestEndpoints(t *testing.T) { } func TestEndpointsFromZero(t *testing.T) { - endpoint := api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "2"}, Endpoints: []string{"127.0.0.1:9000"}} + endpoint := api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "2"}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}, + } fakeWatch := watch.NewFake() fakeWatch.Stop() diff --git a/pkg/proxy/config/config_test.go b/pkg/proxy/config/config_test.go index 5321a9cd106..3481266d58c 100644 --- a/pkg/proxy/config/config_test.go +++ b/pkg/proxy/config/config_test.go @@ -218,11 +218,11 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddedAndNotified(t *testing. config.RegisterHandler(handler2) endpointsUpdate1 := CreateEndpointsUpdate(ADD, api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, - Endpoints: []string{"endpoint1", "endpoint2"}, + Endpoints: []api.Endpoint{{IP: "endpoint1"}, {IP: "endpoint2"}}, }) endpointsUpdate2 := CreateEndpointsUpdate(ADD, api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "bar"}, - Endpoints: []string{"endpoint3", "endpoint4"}, + Endpoints: []api.Endpoint{{IP: "endpoint3"}, {IP: "endpoint4"}}, }) handler.Wait(2) handler2.Wait(2) @@ -244,11 +244,11 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t config.RegisterHandler(handler2) endpointsUpdate1 := CreateEndpointsUpdate(ADD, api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, - Endpoints: []string{"endpoint1", "endpoint2"}, + Endpoints: []api.Endpoint{{IP: "endpoint1"}, {IP: "endpoint2"}}, }) endpointsUpdate2 := CreateEndpointsUpdate(ADD, api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "bar"}, - Endpoints: []string{"endpoint3", "endpoint4"}, + Endpoints: []api.Endpoint{{IP: "endpoint3"}, {IP: "endpoint4"}}, }) handler.Wait(2) handler2.Wait(2) @@ -262,7 +262,7 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t // Add one more endpointsUpdate3 := CreateEndpointsUpdate(ADD, api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foobar"}, - Endpoints: []string{"endpoint5", "endpoint6"}, + Endpoints: []api.Endpoint{{IP: "endpoint5"}, {IP: "endpoint6"}}, }) handler.Wait(1) handler2.Wait(1) @@ -274,7 +274,7 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t // Update the "foo" service with new endpoints endpointsUpdate1 = CreateEndpointsUpdate(ADD, api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, - Endpoints: []string{"endpoint77"}, + Endpoints: []api.Endpoint{{IP: "endpoint7"}}, }) handler.Wait(1) handler2.Wait(1) diff --git a/pkg/proxy/proxier_test.go b/pkg/proxy/proxier_test.go index 70050965714..5215bb546eb 100644 --- a/pkg/proxy/proxier_test.go +++ b/pkg/proxy/proxier_test.go @@ -23,6 +23,7 @@ import ( "net/http" "net/http/httptest" "net/url" + "strconv" "sync/atomic" "testing" "time" @@ -102,8 +103,8 @@ func (fake *fakeIptables) IsIpv6() bool { return false } -var tcpServerPort string -var udpServerPort string +var tcpServerPort int +var udpServerPort int func init() { // Don't handle panics @@ -118,20 +119,28 @@ func init() { if err != nil { panic(fmt.Sprintf("failed to parse: %v", err)) } - _, tcpServerPort, err = net.SplitHostPort(u.Host) + _, port, err := net.SplitHostPort(u.Host) if err != nil { panic(fmt.Sprintf("failed to parse: %v", err)) } + tcpServerPort, err = strconv.Atoi(port) + if err != nil { + panic(fmt.Sprintf("failed to atoi(%s): %v", port, err)) + } // UDP setup. udp, err := newUDPEchoServer() if err != nil { panic(fmt.Sprintf("failed to make a UDP server: %v", err)) } - _, udpServerPort, err = net.SplitHostPort(udp.LocalAddr().String()) + _, port, err = net.SplitHostPort(udp.LocalAddr().String()) if err != nil { panic(fmt.Sprintf("failed to parse: %v", err)) } + udpServerPort, err = strconv.Atoi(port) + if err != nil { + panic(fmt.Sprintf("failed to atoi(%s): %v", port, err)) + } go udp.Loop() } @@ -188,7 +197,7 @@ func TestTCPProxy(t *testing.T) { lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "echo"}, - Endpoints: []string{net.JoinHostPort("127.0.0.1", tcpServerPort)}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}}, }, }) @@ -208,7 +217,7 @@ func TestUDPProxy(t *testing.T) { lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "echo"}, - Endpoints: []string{net.JoinHostPort("127.0.0.1", udpServerPort)}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}}, }, }) @@ -237,7 +246,7 @@ func TestTCPProxyStop(t *testing.T) { lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "echo"}, - Endpoints: []string{net.JoinHostPort("127.0.0.1", tcpServerPort)}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}}, }, }) @@ -268,7 +277,7 @@ func TestUDPProxyStop(t *testing.T) { lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "echo"}, - Endpoints: []string{net.JoinHostPort("127.0.0.1", udpServerPort)}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}}, }, }) @@ -299,7 +308,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) { lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "echo"}, - Endpoints: []string{net.JoinHostPort("127.0.0.1", tcpServerPort)}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}}, }, }) @@ -329,7 +338,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) { lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "echo"}, - Endpoints: []string{net.JoinHostPort("127.0.0.1", udpServerPort)}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}}, }, }) @@ -359,7 +368,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "echo"}, - Endpoints: []string{net.JoinHostPort("127.0.0.1", tcpServerPort)}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}}, }, }) @@ -398,7 +407,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "echo"}, - Endpoints: []string{net.JoinHostPort("127.0.0.1", udpServerPort)}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}}, }, }) @@ -437,7 +446,7 @@ func TestTCPProxyUpdatePort(t *testing.T) { lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "echo"}, - Endpoints: []string{net.JoinHostPort("127.0.0.1", tcpServerPort)}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}}, }, }) @@ -473,7 +482,7 @@ func TestUDPProxyUpdatePort(t *testing.T) { lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "echo"}, - Endpoints: []string{net.JoinHostPort("127.0.0.1", udpServerPort)}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}}, }, }) diff --git a/pkg/proxy/roundrobin.go b/pkg/proxy/roundrobin.go index 8d4adbcbbd7..070c82ee70f 100644 --- a/pkg/proxy/roundrobin.go +++ b/pkg/proxy/roundrobin.go @@ -149,23 +149,18 @@ func (lb *LoadBalancerRR) NextEndpoint(service string, srcAddr net.Addr) (string return endpoint, nil } -func isValidEndpoint(spec string) bool { - _, port, err := net.SplitHostPort(spec) - if err != nil { - return false - } - value, err := strconv.Atoi(port) - if err != nil { - return false - } - return value > 0 +func isValidEndpoint(ep *api.Endpoint) bool { + return ep.IP != "" && ep.Port > 0 } -func filterValidEndpoints(endpoints []string) []string { +func filterValidEndpoints(endpoints []api.Endpoint) []string { + // Convert Endpoint objects into strings for easier use later. Ignore + // the protocol field - we'll get that from the Service objects. var result []string - for _, spec := range endpoints { - if isValidEndpoint(spec) { - result = append(result, spec) + for i := range endpoints { + ep := &endpoints[i] + if isValidEndpoint(ep) { + result = append(result, net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port))) } } return result diff --git a/pkg/proxy/roundrobin_test.go b/pkg/proxy/roundrobin_test.go index 29bac114fbc..3298aa92bd5 100644 --- a/pkg/proxy/roundrobin_test.go +++ b/pkg/proxy/roundrobin_test.go @@ -24,22 +24,28 @@ import ( ) func TestValidateWorks(t *testing.T) { - if isValidEndpoint("") { + if isValidEndpoint(&api.Endpoint{}) { t.Errorf("Didn't fail for empty string") } - if isValidEndpoint("foobar") { + if isValidEndpoint(&api.Endpoint{IP: "foobar"}) { t.Errorf("Didn't fail with no port") } - if isValidEndpoint("foobar:-1") { + if isValidEndpoint(&api.Endpoint{IP: "foobar", Port: -1}) { t.Errorf("Didn't fail with a negative port") } - if !isValidEndpoint("foobar:8080") { + if !isValidEndpoint(&api.Endpoint{IP: "foobar", Port: 8080}) { t.Errorf("Failed a valid config.") } } func TestFilterWorks(t *testing.T) { - endpoints := []string{"foobar:1", "foobar:2", "foobar:-1", "foobar:3", "foobar:-2"} + endpoints := []api.Endpoint{ + {IP: "foobar", Port: 1}, + {IP: "foobar", Port: 2}, + {IP: "foobar", Port: -1}, + {IP: "foobar", Port: 3}, + {IP: "foobar", Port: -2}, + } filtered := filterValidEndpoints(endpoints) if len(filtered) != 3 { @@ -88,7 +94,7 @@ func TestLoadBalanceWorksWithSingleEndpoint(t *testing.T) { endpoints := make([]api.Endpoints, 1) endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, - Endpoints: []string{"endpoint1:40"}, + Endpoints: []api.Endpoint{{IP: "endpoint1", Port: 40}}, } loadBalancer.OnUpdate(endpoints) expectEndpoint(t, loadBalancer, "foo", "endpoint1:40", nil) @@ -106,7 +112,11 @@ func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) { endpoints := make([]api.Endpoints, 1) endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, - Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"}, + Endpoints: []api.Endpoint{ + {IP: "endpoint", Port: 1}, + {IP: "endpoint", Port: 2}, + {IP: "endpoint", Port: 3}, + }, } loadBalancer.OnUpdate(endpoints) shuffledEndpoints := loadBalancer.endpointsMap["foo"] @@ -125,7 +135,11 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { endpoints := make([]api.Endpoints, 1) endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, - Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"}, + Endpoints: []api.Endpoint{ + {IP: "endpoint", Port: 1}, + {IP: "endpoint", Port: 2}, + {IP: "endpoint", Port: 3}, + }, } loadBalancer.OnUpdate(endpoints) shuffledEndpoints := loadBalancer.endpointsMap["foo"] @@ -137,7 +151,10 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { // Then update the configuration with one fewer endpoints, make sure // we start in the beginning again endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, - Endpoints: []string{"endpoint:8", "endpoint:9"}, + Endpoints: []api.Endpoint{ + {IP: "endpoint", Port: 8}, + {IP: "endpoint", Port: 9}, + }, } loadBalancer.OnUpdate(endpoints) shuffledEndpoints = loadBalancer.endpointsMap["foo"] @@ -146,7 +163,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) // Clear endpoints - endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []string{}} + endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{}} loadBalancer.OnUpdate(endpoints) endpoint, err = loadBalancer.NextEndpoint("foo", nil) @@ -164,11 +181,18 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { endpoints := make([]api.Endpoints, 2) endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, - Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"}, + Endpoints: []api.Endpoint{ + {IP: "endpoint", Port: 1}, + {IP: "endpoint", Port: 2}, + {IP: "endpoint", Port: 3}, + }, } endpoints[1] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "bar"}, - Endpoints: []string{"endpoint:4", "endpoint:5"}, + Endpoints: []api.Endpoint{ + {IP: "endpoint", Port: 4}, + {IP: "endpoint", Port: 5}, + }, } loadBalancer.OnUpdate(endpoints) shuffledFooEndpoints := loadBalancer.endpointsMap["foo"] @@ -211,7 +235,7 @@ func TestStickyLoadBalanceWorksWithSingleEndpoint(t *testing.T) { endpoints := make([]api.Endpoints, 1) endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, - Endpoints: []string{"endpoint:1"}, + Endpoints: []api.Endpoint{{IP: "endpoint", Port: 1}}, } loadBalancer.OnUpdate(endpoints) expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client1) @@ -234,7 +258,11 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpoints(t *testing.T) { endpoints := make([]api.Endpoints, 1) endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, - Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"}, + Endpoints: []api.Endpoint{ + {IP: "endpoint", Port: 1}, + {IP: "endpoint", Port: 2}, + {IP: "endpoint", Port: 3}, + }, } loadBalancer.OnUpdate(endpoints) shuffledEndpoints := loadBalancer.endpointsMap["foo"] @@ -262,7 +290,11 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsStickyNone(t *testing.T) { endpoints := make([]api.Endpoints, 1) endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, - Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"}, + Endpoints: []api.Endpoint{ + {IP: "endpoint", Port: 1}, + {IP: "endpoint", Port: 2}, + {IP: "endpoint", Port: 3}, + }, } loadBalancer.OnUpdate(endpoints) shuffledEndpoints := loadBalancer.endpointsMap["foo"] @@ -293,7 +325,11 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { endpoints := make([]api.Endpoints, 1) endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, - Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"}, + Endpoints: []api.Endpoint{ + {IP: "endpoint", Port: 1}, + {IP: "endpoint", Port: 2}, + {IP: "endpoint", Port: 3}, + }, } loadBalancer.OnUpdate(endpoints) shuffledEndpoints := loadBalancer.endpointsMap["foo"] @@ -308,7 +344,10 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, - Endpoints: []string{"endpoint:1", "endpoint:2"}, + Endpoints: []api.Endpoint{ + {IP: "endpoint", Port: 1}, + {IP: "endpoint", Port: 2}, + }, } loadBalancer.OnUpdate(endpoints) shuffledEndpoints = loadBalancer.endpointsMap["foo"] @@ -325,7 +364,11 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, - Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:4"}, + Endpoints: []api.Endpoint{ + {IP: "endpoint", Port: 1}, + {IP: "endpoint", Port: 2}, + {IP: "endpoint", Port: 4}, + }, } loadBalancer.OnUpdate(endpoints) shuffledEndpoints = loadBalancer.endpointsMap["foo"] @@ -351,7 +394,11 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { endpoints := make([]api.Endpoints, 1) endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, - Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"}, + Endpoints: []api.Endpoint{ + {IP: "endpoint", Port: 1}, + {IP: "endpoint", Port: 2}, + {IP: "endpoint", Port: 3}, + }, } loadBalancer.OnUpdate(endpoints) shuffledEndpoints := loadBalancer.endpointsMap["foo"] @@ -365,7 +412,10 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { // Then update the configuration with one fewer endpoints, make sure // we start in the beginning again endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, - Endpoints: []string{"endpoint:4", "endpoint:5"}, + Endpoints: []api.Endpoint{ + {IP: "endpoint", Port: 4}, + {IP: "endpoint", Port: 5}, + }, } loadBalancer.OnUpdate(endpoints) shuffledEndpoints = loadBalancer.endpointsMap["foo"] @@ -376,7 +426,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) // Clear endpoints - endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []string{}} + endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{}} loadBalancer.OnUpdate(endpoints) endpoint, err = loadBalancer.NextEndpoint("foo", nil) @@ -398,12 +448,19 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { endpoints := make([]api.Endpoints, 2) endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, - Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"}, + Endpoints: []api.Endpoint{ + {IP: "endpoint", Port: 1}, + {IP: "endpoint", Port: 2}, + {IP: "endpoint", Port: 3}, + }, } loadBalancer.NewService("bar", api.AffinityTypeClientIP, 0) endpoints[1] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "bar"}, - Endpoints: []string{"endpoint:4", "endpoint:5"}, + Endpoints: []api.Endpoint{ + {IP: "endpoint", Port: 5}, + {IP: "endpoint", Port: 5}, + }, } loadBalancer.OnUpdate(endpoints) shuffledFooEndpoints := loadBalancer.endpointsMap["foo"] diff --git a/pkg/registry/endpoint/rest_test.go b/pkg/registry/endpoint/rest_test.go index 79803c1ce20..99e00798844 100644 --- a/pkg/registry/endpoint/rest_test.go +++ b/pkg/registry/endpoint/rest_test.go @@ -30,7 +30,7 @@ func TestGetEndpoints(t *testing.T) { registry := ®istrytest.ServiceRegistry{ Endpoints: api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, - Endpoints: []string{"127.0.0.1:9000"}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}, }, } storage := NewREST(registry) @@ -39,7 +39,7 @@ func TestGetEndpoints(t *testing.T) { if err != nil { t.Fatalf("unexpected error: %#v", err) } - if !reflect.DeepEqual([]string{"127.0.0.1:9000"}, obj.(*api.Endpoints).Endpoints) { + if !reflect.DeepEqual([]api.Endpoint{{IP: "127.0.0.1", Port: 9000}}, obj.(*api.Endpoints).Endpoints) { t.Errorf("unexpected endpoints: %#v", obj) } } diff --git a/pkg/registry/etcd/etcd_test.go b/pkg/registry/etcd/etcd_test.go index 83f47e52b96..2384fb76a3f 100644 --- a/pkg/registry/etcd/etcd_test.go +++ b/pkg/registry/etcd/etcd_test.go @@ -598,10 +598,10 @@ func TestEtcdListEndpoints(t *testing.T) { Node: &etcd.Node{ Nodes: []*etcd.Node{ { - Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []string{"127.0.0.1:8345"}}), + Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Protocol: "TCP", Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 8345}}}), }, { - Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "bar"}}), + Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "bar"}, Protocol: "TCP"}), }, }, }, @@ -625,7 +625,8 @@ func TestEtcdGetEndpoints(t *testing.T) { registry := NewTestEtcdRegistry(fakeClient) endpoints := &api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, - Endpoints: []string{"127.0.0.1:34855"}, + Protocol: "TCP", + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 34855}}, } key, _ := makeServiceEndpointsKey(ctx, "foo") @@ -648,7 +649,8 @@ func TestEtcdUpdateEndpoints(t *testing.T) { registry := NewTestEtcdRegistry(fakeClient) endpoints := api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, - Endpoints: []string{"baz", "bar"}, + Protocol: "TCP", + Endpoints: []api.Endpoint{{IP: "baz"}, {IP: "bar"}}, } key, _ := makeServiceEndpointsKey(ctx, "foo") diff --git a/pkg/registry/service/rest.go b/pkg/registry/service/rest.go index 9d7e958edb6..c3690f61431 100644 --- a/pkg/registry/service/rest.go +++ b/pkg/registry/service/rest.go @@ -20,6 +20,7 @@ import ( "fmt" "math/rand" "net" + "strconv" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" @@ -217,16 +218,17 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, boo // ResourceLocation returns a URL to which one can send traffic for the specified service. func (rs *REST) ResourceLocation(ctx api.Context, id string) (string, error) { - e, err := rs.registry.GetEndpoints(ctx, id) + eps, err := rs.registry.GetEndpoints(ctx, id) if err != nil { return "", err } - if len(e.Endpoints) == 0 { + if len(eps.Endpoints) == 0 { return "", fmt.Errorf("no endpoints available for %v", id) } // We leave off the scheme ('http://') because we have no idea what sort of server // is listening at this endpoint. - return e.Endpoints[rand.Intn(len(e.Endpoints))], nil + ep := &eps.Endpoints[rand.Intn(len(eps.Endpoints))] + return net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port)), nil } func (rs *REST) createExternalLoadBalancer(ctx api.Context, service *api.Service) error { diff --git a/pkg/registry/service/rest_test.go b/pkg/registry/service/rest_test.go index 170c8a3632a..8426ef4134a 100644 --- a/pkg/registry/service/rest_test.go +++ b/pkg/registry/service/rest_test.go @@ -370,7 +370,7 @@ func TestServiceRegistryGet(t *testing.T) { func TestServiceRegistryResourceLocation(t *testing.T) { ctx := api.NewDefaultContext() registry := registrytest.NewServiceRegistry() - registry.Endpoints = api.Endpoints{Endpoints: []string{"foo:80"}} + registry.Endpoints = api.Endpoints{Endpoints: []api.Endpoint{{IP: "foo", Port: 80}}} fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t)) diff --git a/pkg/service/endpoints_controller.go b/pkg/service/endpoints_controller.go index 82c1ce2a0d8..3f42a9d0d8b 100644 --- a/pkg/service/endpoints_controller.go +++ b/pkg/service/endpoints_controller.go @@ -18,8 +18,6 @@ package service import ( "fmt" - "net" - "strconv" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" @@ -64,7 +62,7 @@ func (e *EndpointController) SyncServiceEndpoints() error { resultErr = err continue } - endpoints := []string{} + endpoints := []api.Endpoint{} for _, pod := range pods.Items { port, err := findPort(&pod, service.Spec.ContainerPort) @@ -89,7 +87,7 @@ func (e *EndpointController) SyncServiceEndpoints() error { continue } - endpoints = append(endpoints, net.JoinHostPort(pod.Status.PodIP, strconv.Itoa(port))) + endpoints = append(endpoints, api.Endpoint{IP: pod.Status.PodIP, Port: port}) } currentEndpoints, err := e.client.Endpoints(service.Namespace).Get(service.Name) if err != nil { @@ -98,6 +96,7 @@ func (e *EndpointController) SyncServiceEndpoints() error { ObjectMeta: api.ObjectMeta{ Name: service.Name, }, + Protocol: service.Spec.Protocol, } } else { glog.Errorf("Error getting endpoints: %v", err) @@ -113,8 +112,8 @@ func (e *EndpointController) SyncServiceEndpoints() error { _, err = e.client.Endpoints(service.Namespace).Create(newEndpoints) } else { // Pre-existing - if endpointsEqual(currentEndpoints, endpoints) { - glog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name) + if currentEndpoints.Protocol == service.Spec.Protocol && endpointsEqual(currentEndpoints, endpoints) { + glog.V(5).Infof("protocol and endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name) continue } _, err = e.client.Endpoints(service.Namespace).Update(newEndpoints) @@ -127,24 +126,24 @@ func (e *EndpointController) SyncServiceEndpoints() error { return resultErr } -func containsEndpoint(endpoints *api.Endpoints, endpoint string) bool { - if endpoints == nil { +func containsEndpoint(haystack *api.Endpoints, needle *api.Endpoint) bool { + if haystack == nil || needle == nil { return false } - for ix := range endpoints.Endpoints { - if endpoints.Endpoints[ix] == endpoint { + for ix := range haystack.Endpoints { + if haystack.Endpoints[ix] == *needle { return true } } return false } -func endpointsEqual(e *api.Endpoints, endpoints []string) bool { - if len(e.Endpoints) != len(endpoints) { +func endpointsEqual(eps *api.Endpoints, endpoints []api.Endpoint) bool { + if len(eps.Endpoints) != len(endpoints) { return false } - for _, endpoint := range endpoints { - if !containsEndpoint(e, endpoint) { + for i := range endpoints { + if !containsEndpoint(eps, &endpoints[i]) { return false } } diff --git a/pkg/service/endpoints_controller_test.go b/pkg/service/endpoints_controller_test.go index 3fcf227f602..9476f8df5ff 100644 --- a/pkg/service/endpoints_controller_test.go +++ b/pkg/service/endpoints_controller_test.go @@ -245,7 +245,72 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { Name: "foo", ResourceVersion: "1", }, - Endpoints: []string{"6.7.8.9:1000"}, + Protocol: api.ProtocolTCP, + Endpoints: []api.Endpoint{{IP: "6.7.8.9", Port: 1000}}, + }}) + defer testServer.Close() + client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) + endpoints := NewEndpointController(client) + if err := endpoints.SyncServiceEndpoints(); err != nil { + t.Errorf("unexpected error: %v", err) + } + endpointsHandler.ValidateRequestCount(t, 0) +} + +func TestSyncEndpointsProtocolTCP(t *testing.T) { + serviceList := api.ServiceList{ + Items: []api.Service{ + { + ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "other"}, + Spec: api.ServiceSpec{ + Selector: map[string]string{}, + Protocol: api.ProtocolTCP, + }, + }, + }, + } + testServer, endpointsHandler := makeTestServer(t, + serverResponse{http.StatusOK, newPodList(0)}, + serverResponse{http.StatusOK, &serviceList}, + serverResponse{http.StatusOK, &api.Endpoints{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + ResourceVersion: "1", + }, + Protocol: api.ProtocolTCP, + Endpoints: []api.Endpoint{{IP: "6.7.8.9", Port: 1000}}, + }}) + defer testServer.Close() + client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) + endpoints := NewEndpointController(client) + if err := endpoints.SyncServiceEndpoints(); err != nil { + t.Errorf("unexpected error: %v", err) + } + endpointsHandler.ValidateRequestCount(t, 0) +} + +func TestSyncEndpointsProtocolUDP(t *testing.T) { + serviceList := api.ServiceList{ + Items: []api.Service{ + { + ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "other"}, + Spec: api.ServiceSpec{ + Selector: map[string]string{}, + Protocol: api.ProtocolUDP, + }, + }, + }, + } + testServer, endpointsHandler := makeTestServer(t, + serverResponse{http.StatusOK, newPodList(0)}, + serverResponse{http.StatusOK, &serviceList}, + serverResponse{http.StatusOK, &api.Endpoints{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + ResourceVersion: "1", + }, + Protocol: api.ProtocolUDP, + Endpoints: []api.Endpoint{{IP: "6.7.8.9", Port: 1000}}, }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) @@ -275,7 +340,8 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { Name: "foo", ResourceVersion: "1", }, - Endpoints: []string{}, + Protocol: api.ProtocolTCP, + Endpoints: []api.Endpoint{}, }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) @@ -288,7 +354,8 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { Name: "foo", ResourceVersion: "1", }, - Endpoints: []string{"1.2.3.4:8080"}, + Protocol: api.ProtocolTCP, + Endpoints: []api.Endpoint{{IP: "1.2.3.4", Port: 8080}}, }) endpointsHandler.ValidateRequest(t, "/api/"+testapi.Version()+"/endpoints/foo?namespace=other", "PUT", &data) } @@ -314,7 +381,8 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { Name: "foo", ResourceVersion: "1", }, - Endpoints: []string{"6.7.8.9:1000"}, + Protocol: api.ProtocolTCP, + Endpoints: []api.Endpoint{{IP: "6.7.8.9", Port: 1000}}, }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) @@ -327,7 +395,8 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { Name: "foo", ResourceVersion: "1", }, - Endpoints: []string{"1.2.3.4:8080"}, + Protocol: api.ProtocolTCP, + Endpoints: []api.Endpoint{{IP: "1.2.3.4", Port: 8080}}, }) endpointsHandler.ValidateRequest(t, "/api/"+testapi.Version()+"/endpoints/foo?namespace=bar", "PUT", &data) } @@ -352,7 +421,8 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) { ObjectMeta: api.ObjectMeta{ ResourceVersion: "1", }, - Endpoints: []string{"1.2.3.4:8080"}, + Protocol: api.ProtocolTCP, + Endpoints: []api.Endpoint{{IP: "1.2.3.4", Port: 8080}}, }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) @@ -390,7 +460,8 @@ func TestSyncEndpointsItems(t *testing.T) { ObjectMeta: api.ObjectMeta{ ResourceVersion: "", }, - Endpoints: []string{"1.2.3.4:8080"}, + Protocol: api.ProtocolTCP, + Endpoints: []api.Endpoint{{IP: "1.2.3.4", Port: 8080}}, }) endpointsHandler.ValidateRequest(t, "/api/"+testapi.Version()+"/endpoints?namespace=other", "POST", &data) } diff --git a/pkg/tools/etcd_tools_watch_test.go b/pkg/tools/etcd_tools_watch_test.go index 16602ec7ac3..9685898d6bb 100644 --- a/pkg/tools/etcd_tools_watch_test.go +++ b/pkg/tools/etcd_tools_watch_test.go @@ -280,7 +280,7 @@ func TestWatchEtcdState(t *testing.T) { codec := latest.Codec type T struct { Type watch.EventType - Endpoints []string + Endpoints []api.Endpoint } testCases := map[string]struct { Initial map[string]EtcdResponseWithError @@ -294,7 +294,7 @@ func TestWatchEtcdState(t *testing.T) { { Action: "create", Node: &etcd.Node{ - Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []string{}})), + Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{}})), }, }, }, @@ -308,12 +308,12 @@ func TestWatchEtcdState(t *testing.T) { { Action: "compareAndSwap", Node: &etcd.Node{ - Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []string{"127.0.0.1:9000"}})), + Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}})), CreatedIndex: 1, ModifiedIndex: 2, }, PrevNode: &etcd.Node{ - Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []string{}})), + Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{}})), CreatedIndex: 1, ModifiedIndex: 1, }, @@ -321,7 +321,7 @@ func TestWatchEtcdState(t *testing.T) { }, From: 1, Expected: []*T{ - {watch.Modified, []string{"127.0.0.1:9000"}}, + {watch.Modified, []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}}, }, }, "from initial state": { @@ -330,7 +330,7 @@ func TestWatchEtcdState(t *testing.T) { R: &etcd.Response{ Action: "get", Node: &etcd.Node{ - Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []string{}})), + Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{}})), CreatedIndex: 1, ModifiedIndex: 1, }, @@ -343,12 +343,12 @@ func TestWatchEtcdState(t *testing.T) { { Action: "compareAndSwap", Node: &etcd.Node{ - Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []string{"127.0.0.1:9000"}})), + Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}})), CreatedIndex: 1, ModifiedIndex: 2, }, PrevNode: &etcd.Node{ - Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []string{}})), + Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{}})), CreatedIndex: 1, ModifiedIndex: 1, }, @@ -356,7 +356,7 @@ func TestWatchEtcdState(t *testing.T) { }, Expected: []*T{ {watch.Added, nil}, - {watch.Modified, []string{"127.0.0.1:9000"}}, + {watch.Modified, []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}}, }, }, } diff --git a/test/e2e/service.go b/test/e2e/service.go index 97ef7d1b66b..7a9ed995947 100644 --- a/test/e2e/service.go +++ b/test/e2e/service.go @@ -18,7 +18,6 @@ package e2e import ( "fmt" - "net" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -207,7 +206,7 @@ var _ = Describe("Services", func() { } _, err := c.Services(ns).Create(service) Expect(err).NotTo(HaveOccurred()) - expectedPort := "80" + expectedPort := 80 validateEndpointsOrFail(c, ns, serviceName, expectedPort, []string{}) @@ -248,17 +247,13 @@ var _ = Describe("Services", func() { }, 120.0) }) -func validateIPsOrFail(c *client.Client, ns, expectedPort string, expectedEndpoints []string, endpoints *api.Endpoints) { +func validateIPsOrFail(c *client.Client, ns string, expectedPort int, expectedEndpoints []string, endpoints *api.Endpoints) { ips := util.StringSet{} - for _, spec := range endpoints.Endpoints { - host, port, err := net.SplitHostPort(spec) - if err != nil { - Fail(fmt.Sprintf("invalid endpoint spec: %s (%v)", spec, err)) + for _, ep := range endpoints.Endpoints { + if ep.Port != expectedPort { + Fail(fmt.Sprintf("invalid port, expected %d, got %d", expectedPort, ep.Port)) } - if port != expectedPort { - Fail(fmt.Sprintf("invalid port, expected %s, got %s", expectedPort, port)) - } - ips.Insert(host) + ips.Insert(ep.IP) } for _, name := range expectedEndpoints { @@ -272,7 +267,7 @@ func validateIPsOrFail(c *client.Client, ns, expectedPort string, expectedEndpoi } } -func validateEndpointsOrFail(c *client.Client, ns, serviceName, expectedPort string, expectedEndpoints []string) { +func validateEndpointsOrFail(c *client.Client, ns, serviceName string, expectedPort int, expectedEndpoints []string) { for { endpoints, err := c.Endpoints(ns).Get(serviceName) if err == nil {