diff --git a/pkg/api/types.go b/pkg/api/types.go index 5af815c162d..6b29e901d5f 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -747,6 +747,9 @@ type Endpoints struct { TypeMeta `json:",inline"` ObjectMeta `json:"metadata,omitempty"` + // Optional: The IP protocol for these endpoints. Supports "TCP" and + // "UDP". Defaults to "TCP". + Protocol Protocol `json:"protocol,omitempty"` Endpoints []string `json:"endpoints,omitempty"` } diff --git a/pkg/api/v1beta1/defaults.go b/pkg/api/v1beta1/defaults.go index 713458902c2..c833f9ffb60 100644 --- a/pkg/api/v1beta1/defaults.go +++ b/pkg/api/v1beta1/defaults.go @@ -85,5 +85,10 @@ func init() { obj.Type = SecretTypeOpaque } }, + func(obj *Endpoints) { + if obj.Protocol == "" { + obj.Protocol = "TCP" + } + }, ) } diff --git a/pkg/api/v1beta1/defaults_test.go b/pkg/api/v1beta1/defaults_test.go index bef10ab9016..d7372045247 100644 --- a/pkg/api/v1beta1/defaults_test.go +++ b/pkg/api/v1beta1/defaults_test.go @@ -102,3 +102,13 @@ func TestSetDefaultSecret(t *testing.T) { t.Errorf("Expected secret type %v, got %v", current.SecretTypeOpaque, s2.Type) } } + +func TestSetDefaulEndpointsProtocol(t *testing.T) { + in := ¤t.Endpoints{} + obj := roundTrip(t, runtime.Object(in)) + out := obj.(*current.Endpoints) + + if out.Protocol != current.ProtocolTCP { + t.Errorf("Expected protocol %s, got %s", current.ProtocolTCP, out.Protocol) + } +} diff --git a/pkg/api/v1beta1/types.go b/pkg/api/v1beta1/types.go index f0ced4ad101..3ab025579b0 100644 --- a/pkg/api/v1beta1/types.go +++ b/pkg/api/v1beta1/types.go @@ -601,7 +601,10 @@ type Service struct { // Endpoints is a collection of endpoints that implement the actual service, for example: // Name: "mysql", Endpoints: ["10.10.1.1:1909", "10.10.2.2:8834"] type Endpoints struct { - TypeMeta `json:",inline"` + TypeMeta `json:",inline"` + // Optional: The IP protocol for these endpoints. Supports "TCP" and + // "UDP". Defaults to "TCP". + Protocol Protocol `json:"protocol,omitempty" description:"IP protocol for endpoint ports; must be UDP or TCP; TCP if unspecified"` Endpoints []string `json:"endpoints,omitempty" description:"list of endpoints corresponding to a service, of the form address:port, such as 10.10.1.1:1909"` } diff --git a/pkg/api/v1beta2/defaults.go b/pkg/api/v1beta2/defaults.go index ef0abf4100a..9242bf3a323 100644 --- a/pkg/api/v1beta2/defaults.go +++ b/pkg/api/v1beta2/defaults.go @@ -87,5 +87,10 @@ func init() { obj.Type = SecretTypeOpaque } }, + func(obj *Endpoints) { + if obj.Protocol == "" { + obj.Protocol = "TCP" + } + }, ) } diff --git a/pkg/api/v1beta2/defaults_test.go b/pkg/api/v1beta2/defaults_test.go index fb14920244d..a73c83a93e6 100644 --- a/pkg/api/v1beta2/defaults_test.go +++ b/pkg/api/v1beta2/defaults_test.go @@ -102,3 +102,13 @@ func TestSetDefaultSecret(t *testing.T) { t.Errorf("Expected secret type %v, got %v", current.SecretTypeOpaque, s2.Type) } } + +func TestSetDefaulEndpointsProtocol(t *testing.T) { + in := ¤t.Endpoints{} + obj := roundTrip(t, runtime.Object(in)) + out := obj.(*current.Endpoints) + + if out.Protocol != current.ProtocolTCP { + t.Errorf("Expected protocol %s, got %s", current.ProtocolTCP, out.Protocol) + } +} diff --git a/pkg/api/v1beta2/types.go b/pkg/api/v1beta2/types.go index ae729e7dc82..b0385a75ec1 100644 --- a/pkg/api/v1beta2/types.go +++ b/pkg/api/v1beta2/types.go @@ -565,7 +565,10 @@ type Service struct { // Endpoints is a collection of endpoints that implement the actual service, for example: // Name: "mysql", Endpoints: ["10.10.1.1:1909", "10.10.2.2:8834"] type Endpoints struct { - TypeMeta `json:",inline"` + TypeMeta `json:",inline"` + // Optional: The IP protocol for these endpoints. Supports "TCP" and + // "UDP". Defaults to "TCP". + Protocol Protocol `json:"protocol,omitempty" description:"IP protocol for endpoint ports; must be UDP or TCP; TCP if unspecified"` Endpoints []string `json:"endpoints,omitempty" description:"list of endpoints corresponding to a service, of the form address:port, such as 10.10.1.1:1909"` } diff --git a/pkg/api/v1beta3/defaults.go b/pkg/api/v1beta3/defaults.go index 4abfa2c8178..3149601fb64 100644 --- a/pkg/api/v1beta3/defaults.go +++ b/pkg/api/v1beta3/defaults.go @@ -80,5 +80,10 @@ func init() { obj.Type = SecretTypeOpaque } }, + func(obj *Endpoints) { + if obj.Protocol == "" { + obj.Protocol = "TCP" + } + }, ) } diff --git a/pkg/api/v1beta3/defaults_test.go b/pkg/api/v1beta3/defaults_test.go index 370e3f574c4..e1ebf388b1e 100644 --- a/pkg/api/v1beta3/defaults_test.go +++ b/pkg/api/v1beta3/defaults_test.go @@ -102,3 +102,13 @@ func TestSetDefaultSecret(t *testing.T) { t.Errorf("Expected secret type %v, got %v", current.SecretTypeOpaque, s2.Type) } } + +func TestSetDefaulEndpointsProtocol(t *testing.T) { + in := ¤t.Endpoints{} + obj := roundTrip(t, runtime.Object(in)) + out := obj.(*current.Endpoints) + + if out.Protocol != current.ProtocolTCP { + t.Errorf("Expected protocol %s, got %s", current.ProtocolTCP, out.Protocol) + } +} diff --git a/pkg/api/v1beta3/types.go b/pkg/api/v1beta3/types.go index b7bb118844d..412a02a9660 100644 --- a/pkg/api/v1beta3/types.go +++ b/pkg/api/v1beta3/types.go @@ -779,6 +779,10 @@ type Endpoints struct { TypeMeta `json:",inline"` ObjectMeta `json:"metadata"` + // Optional: The IP protocol for these endpoints. Supports "TCP" and + // "UDP". Defaults to "TCP". + Protocol Protocol `json:"protocol,omitempty"` + // Endpoints is the list of host ports that satisfy the service selector Endpoints []string `json:"endpoints"` } diff --git a/pkg/master/publish.go b/pkg/master/publish.go index ced152954b9..a9e69635875 100644 --- a/pkg/master/publish.go +++ b/pkg/master/publish.go @@ -135,6 +135,7 @@ func (m *Master) ensureEndpointsContain(serviceName string, endpoint string) err Name: serviceName, Namespace: api.NamespaceDefault, }, + Protocol: "TCP", } } found := false diff --git a/pkg/registry/etcd/etcd_test.go b/pkg/registry/etcd/etcd_test.go index 83f47e52b96..1b8ec37f1e8 100644 --- a/pkg/registry/etcd/etcd_test.go +++ b/pkg/registry/etcd/etcd_test.go @@ -598,10 +598,10 @@ func TestEtcdListEndpoints(t *testing.T) { Node: &etcd.Node{ Nodes: []*etcd.Node{ { - Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []string{"127.0.0.1:8345"}}), + Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Protocol: "TCP", Endpoints: []string{"127.0.0.1:8345"}}), }, { - Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "bar"}}), + Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "bar"}, Protocol: "TCP"}), }, }, }, @@ -625,6 +625,7 @@ func TestEtcdGetEndpoints(t *testing.T) { registry := NewTestEtcdRegistry(fakeClient) endpoints := &api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, + Protocol: "TCP", Endpoints: []string{"127.0.0.1:34855"}, } @@ -648,6 +649,7 @@ func TestEtcdUpdateEndpoints(t *testing.T) { registry := NewTestEtcdRegistry(fakeClient) endpoints := api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, + Protocol: "TCP", Endpoints: []string{"baz", "bar"}, } diff --git a/pkg/service/endpoints_controller.go b/pkg/service/endpoints_controller.go index 82c1ce2a0d8..fa5b47713a3 100644 --- a/pkg/service/endpoints_controller.go +++ b/pkg/service/endpoints_controller.go @@ -98,6 +98,7 @@ func (e *EndpointController) SyncServiceEndpoints() error { ObjectMeta: api.ObjectMeta{ Name: service.Name, }, + Protocol: service.Spec.Protocol, } } else { glog.Errorf("Error getting endpoints: %v", err) @@ -113,8 +114,8 @@ func (e *EndpointController) SyncServiceEndpoints() error { _, err = e.client.Endpoints(service.Namespace).Create(newEndpoints) } else { // Pre-existing - if endpointsEqual(currentEndpoints, endpoints) { - glog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name) + if currentEndpoints.Protocol == service.Spec.Protocol && endpointsEqual(currentEndpoints, endpoints) { + glog.V(5).Infof("protocol and endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name) continue } _, err = e.client.Endpoints(service.Namespace).Update(newEndpoints) diff --git a/pkg/service/endpoints_controller_test.go b/pkg/service/endpoints_controller_test.go index 3fcf227f602..bab820bc2e9 100644 --- a/pkg/service/endpoints_controller_test.go +++ b/pkg/service/endpoints_controller_test.go @@ -245,6 +245,71 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { Name: "foo", ResourceVersion: "1", }, + Protocol: "TCP", + Endpoints: []string{"6.7.8.9:1000"}, + }}) + defer testServer.Close() + client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) + endpoints := NewEndpointController(client) + if err := endpoints.SyncServiceEndpoints(); err != nil { + t.Errorf("unexpected error: %v", err) + } + endpointsHandler.ValidateRequestCount(t, 0) +} + +func TestSyncEndpointsProtocolTCP(t *testing.T) { + serviceList := api.ServiceList{ + Items: []api.Service{ + { + ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "other"}, + Spec: api.ServiceSpec{ + Selector: map[string]string{}, + Protocol: "TCP", + }, + }, + }, + } + testServer, endpointsHandler := makeTestServer(t, + serverResponse{http.StatusOK, newPodList(0)}, + serverResponse{http.StatusOK, &serviceList}, + serverResponse{http.StatusOK, &api.Endpoints{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + ResourceVersion: "1", + }, + Protocol: "TCP", + Endpoints: []string{"6.7.8.9:1000"}, + }}) + defer testServer.Close() + client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) + endpoints := NewEndpointController(client) + if err := endpoints.SyncServiceEndpoints(); err != nil { + t.Errorf("unexpected error: %v", err) + } + endpointsHandler.ValidateRequestCount(t, 0) +} + +func TestSyncEndpointsProtocolUDP(t *testing.T) { + serviceList := api.ServiceList{ + Items: []api.Service{ + { + ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "other"}, + Spec: api.ServiceSpec{ + Selector: map[string]string{}, + Protocol: "UDP", + }, + }, + }, + } + testServer, endpointsHandler := makeTestServer(t, + serverResponse{http.StatusOK, newPodList(0)}, + serverResponse{http.StatusOK, &serviceList}, + serverResponse{http.StatusOK, &api.Endpoints{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + ResourceVersion: "1", + }, + Protocol: "UDP", Endpoints: []string{"6.7.8.9:1000"}, }}) defer testServer.Close() @@ -275,6 +340,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { Name: "foo", ResourceVersion: "1", }, + Protocol: "TCP", Endpoints: []string{}, }}) defer testServer.Close() @@ -288,6 +354,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { Name: "foo", ResourceVersion: "1", }, + Protocol: "TCP", Endpoints: []string{"1.2.3.4:8080"}, }) endpointsHandler.ValidateRequest(t, "/api/"+testapi.Version()+"/endpoints/foo?namespace=other", "PUT", &data) @@ -314,6 +381,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { Name: "foo", ResourceVersion: "1", }, + Protocol: "TCP", Endpoints: []string{"6.7.8.9:1000"}, }}) defer testServer.Close() @@ -327,6 +395,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { Name: "foo", ResourceVersion: "1", }, + Protocol: "TCP", Endpoints: []string{"1.2.3.4:8080"}, }) endpointsHandler.ValidateRequest(t, "/api/"+testapi.Version()+"/endpoints/foo?namespace=bar", "PUT", &data) @@ -352,6 +421,7 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) { ObjectMeta: api.ObjectMeta{ ResourceVersion: "1", }, + Protocol: "TCP", Endpoints: []string{"1.2.3.4:8080"}, }}) defer testServer.Close() @@ -390,6 +460,7 @@ func TestSyncEndpointsItems(t *testing.T) { ObjectMeta: api.ObjectMeta{ ResourceVersion: "", }, + Protocol: "TCP", Endpoints: []string{"1.2.3.4:8080"}, }) endpointsHandler.ValidateRequest(t, "/api/"+testapi.Version()+"/endpoints?namespace=other", "POST", &data)