From daed0af3b54fb0c7c324c90d23f6039f0f4085e4 Mon Sep 17 00:00:00 2001 From: Federico Simoncelli Date: Tue, 17 Feb 2015 08:24:05 -0500 Subject: [PATCH] api: return endpoints target object references Signed-off-by: Federico Simoncelli --- pkg/api/types.go | 3 ++ pkg/api/v1beta1/conversion.go | 22 +++++++++++- pkg/api/v1beta1/types.go | 8 +++++ pkg/api/v1beta2/conversion.go | 22 +++++++++++- pkg/api/v1beta2/types.go | 8 +++++ pkg/api/v1beta3/types.go | 3 ++ pkg/master/publish_test.go | 8 ++--- pkg/service/endpoints_controller.go | 30 +++++++++++++--- pkg/service/endpoints_controller_test.go | 44 +++++++++++++++++++----- 9 files changed, 130 insertions(+), 18 deletions(-) diff --git a/pkg/api/types.go b/pkg/api/types.go index 92af31a4f21..e0bd74b28ac 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -767,6 +767,9 @@ type Endpoint struct { // Required: The destination port to access. Port int `json:"port"` + + // Optional: The kubernetes object related to the entry point. + TargetRef *ObjectReference `json:"targetRef,omitempty"` } // EndpointsList is a list of endpoints. diff --git a/pkg/api/v1beta1/conversion.go b/pkg/api/v1beta1/conversion.go index b1635dc9e5b..b2d1dc9225f 100644 --- a/pkg/api/v1beta1/conversion.go +++ b/pkg/api/v1beta1/conversion.go @@ -36,6 +36,7 @@ func init() { newer.Scheme.AddStructFieldConversion(newer.TypeMeta{}, "TypeMeta", TypeMeta{}, "TypeMeta") newer.Scheme.AddStructFieldConversion(newer.ObjectMeta{}, "ObjectMeta", TypeMeta{}, "TypeMeta") newer.Scheme.AddStructFieldConversion(newer.ListMeta{}, "ListMeta", TypeMeta{}, "TypeMeta") + newer.Scheme.AddStructFieldConversion(newer.Endpoints{}, "Endpoints", Endpoints{}, "Endpoints") // TODO: scope this to a specific type once that becomes available and remove the Event conversion functions below // newer.Scheme.AddStructFieldConversion(string(""), "Status", string(""), "Condition") @@ -1177,7 +1178,17 @@ func init() { } for i := range in.Endpoints { ep := &in.Endpoints[i] - out.Endpoints = append(out.Endpoints, net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port))) + hostPort := net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port)) + out.Endpoints = append(out.Endpoints, hostPort) + if ep.TargetRef != nil { + target := EndpointObjectReference{ + Endpoint: hostPort, + } + if err := s.Convert(ep.TargetRef, &target.ObjectReference, 0); err != nil { + return err + } + out.TargetRefs = append(out.TargetRefs, target) + } } return nil }, @@ -1204,6 +1215,15 @@ func init() { return err } ep.Port = pn + for j := range in.TargetRefs { + if in.TargetRefs[j].Endpoint != in.Endpoints[i] { + continue + } + ep.TargetRef = &newer.ObjectReference{} + if err := s.Convert(&in.TargetRefs[j].ObjectReference, ep.TargetRef, 0); err != nil { + return err + } + } } return nil }, diff --git a/pkg/api/v1beta1/types.go b/pkg/api/v1beta1/types.go index 924b3d768a9..cd5afd77c45 100644 --- a/pkg/api/v1beta1/types.go +++ b/pkg/api/v1beta1/types.go @@ -602,6 +602,12 @@ type Service struct { SessionAffinity AffinityType `json:"sessionAffinity,omitempty" description:"enable client IP based session affinity; must be ClientIP or None; defaults to None"` } +// EndpointObjectReference is a reference to an object exposing the endpoint +type EndpointObjectReference struct { + Endpoint string `json:"endpoint" description:"endpoint exposed by the referenced object"` + ObjectReference `json:"targetRef" description:"reference to the object providing the entry point"` +} + // Endpoints is a collection of endpoints that implement the actual service, for example: // Name: "mysql", Endpoints: ["10.10.1.1:1909", "10.10.2.2:8834"] type Endpoints struct { @@ -610,6 +616,8 @@ type Endpoints struct { // "UDP". Defaults to "TCP". Protocol Protocol `json:"protocol,omitempty" description:"IP protocol for endpoint ports; must be UDP or TCP; TCP if unspecified"` Endpoints []string `json:"endpoints" description:"list of endpoints corresponding to a service, of the form address:port, such as 10.10.1.1:1909"` + // Optional: The kubernetes object related to the entry point. + TargetRefs []EndpointObjectReference `json:"targetRefs,omitempty" description:"list of references to objects providing the endpoints"` } // EndpointsList is a list of endpoints. diff --git a/pkg/api/v1beta2/conversion.go b/pkg/api/v1beta2/conversion.go index d6cf154f26c..ef5d65aee0a 100644 --- a/pkg/api/v1beta2/conversion.go +++ b/pkg/api/v1beta2/conversion.go @@ -36,6 +36,7 @@ func init() { newer.Scheme.AddStructFieldConversion(newer.TypeMeta{}, "TypeMeta", TypeMeta{}, "TypeMeta") newer.Scheme.AddStructFieldConversion(newer.ObjectMeta{}, "ObjectMeta", TypeMeta{}, "TypeMeta") newer.Scheme.AddStructFieldConversion(newer.ListMeta{}, "ListMeta", TypeMeta{}, "TypeMeta") + newer.Scheme.AddStructFieldConversion(newer.Endpoints{}, "Endpoints", Endpoints{}, "Endpoints") // TODO: scope this to a specific type once that becomes available and remove the Event conversion functions below // newer.Scheme.AddStructFieldConversion(string(""), "Status", string(""), "Condition") @@ -1093,7 +1094,17 @@ func init() { } for i := range in.Endpoints { ep := &in.Endpoints[i] - out.Endpoints = append(out.Endpoints, net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port))) + hostPort := net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port)) + out.Endpoints = append(out.Endpoints, hostPort) + if ep.TargetRef != nil { + target := EndpointObjectReference{ + Endpoint: hostPort, + } + if err := s.Convert(ep.TargetRef, &target.ObjectReference, 0); err != nil { + return err + } + out.TargetRefs = append(out.TargetRefs, target) + } } return nil }, @@ -1120,6 +1131,15 @@ func init() { return err } ep.Port = pn + for j := range in.TargetRefs { + if in.TargetRefs[j].Endpoint != in.Endpoints[i] { + continue + } + ep.TargetRef = &newer.ObjectReference{} + if err := s.Convert(&in.TargetRefs[j], ep.TargetRef, 0); err != nil { + return err + } + } } return nil }, diff --git a/pkg/api/v1beta2/types.go b/pkg/api/v1beta2/types.go index 58a853b1344..3696f464cff 100644 --- a/pkg/api/v1beta2/types.go +++ b/pkg/api/v1beta2/types.go @@ -607,6 +607,12 @@ type Service struct { SessionAffinity AffinityType `json:"sessionAffinity,omitempty" description:"enable client IP based session affinity; must be ClientIP or None; defaults to None"` } +// EndpointObjectReference is a reference to an object exposing the endpoint +type EndpointObjectReference struct { + Endpoint string `json:"endpoint" description:"endpoint exposed by the referenced object"` + ObjectReference `json:"targetRef" description:"reference to the object providing the entry point"` +} + // Endpoints is a collection of endpoints that implement the actual service, for example: // Name: "mysql", Endpoints: ["10.10.1.1:1909", "10.10.2.2:8834"] type Endpoints struct { @@ -615,6 +621,8 @@ type Endpoints struct { // "UDP". Defaults to "TCP". Protocol Protocol `json:"protocol,omitempty" description:"IP protocol for endpoint ports; must be UDP or TCP; TCP if unspecified"` Endpoints []string `json:"endpoints" description:"list of endpoints corresponding to a service, of the form address:port, such as 10.10.1.1:1909"` + // Optional: The kubernetes object related to the entry point. + TargetRefs []EndpointObjectReference `json:"targetRefs,omitempty" description:"list of references to objects providing the endpoints"` } // EndpointsList is a list of endpoints. diff --git a/pkg/api/v1beta3/types.go b/pkg/api/v1beta3/types.go index c347a25f4a8..ac2f09f9d09 100644 --- a/pkg/api/v1beta3/types.go +++ b/pkg/api/v1beta3/types.go @@ -798,6 +798,9 @@ type Endpoint struct { // Required: The destination port to access. Port int `json:"port" description:"destination port of this endpoint"` + + // Optional: The kubernetes object related to the entry point. + TargetRef *ObjectReference `json:"targetRef,omitempty" description:"reference to object providing the endpoint"` } // EndpointsList is a list of endpoints. diff --git a/pkg/master/publish_test.go b/pkg/master/publish_test.go index b9fd3b040ab..bcff8fe142d 100644 --- a/pkg/master/publish_test.go +++ b/pkg/master/publish_test.go @@ -69,7 +69,7 @@ func TestEnsureEndpointsContain(t *testing.T) { }, }, masterCount: 1, - expectedEndpoints: []api.Endpoint{{"1.2.3.4", 8080}}, + expectedEndpoints: []api.Endpoint{{"1.2.3.4", 8080, nil}}, }, { serviceName: "foo", @@ -120,7 +120,7 @@ func TestEnsureEndpointsContain(t *testing.T) { }, }, masterCount: 2, - expectedEndpoints: []api.Endpoint{{"4.3.2.1", 9090}, {"1.2.3.4", 8080}}, + expectedEndpoints: []api.Endpoint{{"4.3.2.1", 9090, nil}, {"1.2.3.4", 8080, nil}}, }, { serviceName: "foo", @@ -150,7 +150,7 @@ func TestEnsureEndpointsContain(t *testing.T) { }, }, masterCount: 2, - expectedEndpoints: []api.Endpoint{{"1.2.3.4", 8000}, {"1.2.3.4", 8080}}, + expectedEndpoints: []api.Endpoint{{"1.2.3.4", 8000, nil}, {"1.2.3.4", 8080, nil}}, }, } for _, test := range tests { @@ -170,7 +170,7 @@ func TestEnsureEndpointsContain(t *testing.T) { } if test.expectUpdate { if test.expectedEndpoints == nil { - test.expectedEndpoints = []api.Endpoint{{test.ip, test.port}} + test.expectedEndpoints = []api.Endpoint{{test.ip, test.port, nil}} } expectedUpdate := api.Endpoints{ ObjectMeta: api.ObjectMeta{ diff --git a/pkg/service/endpoints_controller.go b/pkg/service/endpoints_controller.go index af8f31c73b8..453153d40b2 100644 --- a/pkg/service/endpoints_controller.go +++ b/pkg/service/endpoints_controller.go @@ -93,7 +93,17 @@ func (e *EndpointController) SyncServiceEndpoints() error { continue } - endpoints = append(endpoints, api.Endpoint{IP: pod.Status.PodIP, Port: port}) + endpoints = append(endpoints, api.Endpoint{ + IP: pod.Status.PodIP, + Port: port, + TargetRef: &api.ObjectReference{ + Kind: "Pod", + Namespace: pod.ObjectMeta.Namespace, + Name: pod.ObjectMeta.Name, + UID: pod.ObjectMeta.UID, + ResourceVersion: pod.ObjectMeta.ResourceVersion, + }, + }) } currentEndpoints, err := e.client.Endpoints(service.Namespace).Get(service.Name) if err != nil { @@ -118,7 +128,7 @@ func (e *EndpointController) SyncServiceEndpoints() error { _, err = e.client.Endpoints(service.Namespace).Create(newEndpoints) } else { // Pre-existing - if currentEndpoints.Protocol == service.Spec.Protocol && endpointsEqual(currentEndpoints, endpoints) { + if currentEndpoints.Protocol == service.Spec.Protocol && endpointsListEqual(currentEndpoints, endpoints) { glog.V(5).Infof("protocol and endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name) continue } @@ -132,19 +142,31 @@ func (e *EndpointController) SyncServiceEndpoints() error { return resultErr } +func endpointEqual(this, that *api.Endpoint) bool { + if this.IP != that.IP || this.Port != that.Port { + return false + } + + if this.TargetRef == nil || that.TargetRef == nil { + return this.TargetRef == that.TargetRef + } + + return *this.TargetRef == *that.TargetRef +} + func containsEndpoint(haystack *api.Endpoints, needle *api.Endpoint) bool { if haystack == nil || needle == nil { return false } for ix := range haystack.Endpoints { - if haystack.Endpoints[ix] == *needle { + if endpointEqual(&haystack.Endpoints[ix], needle) { return true } } return false } -func endpointsEqual(eps *api.Endpoints, endpoints []api.Endpoint) bool { +func endpointsListEqual(eps *api.Endpoints, endpoints []api.Endpoint) bool { if len(eps.Endpoints) != len(endpoints) { return false } diff --git a/pkg/service/endpoints_controller_test.go b/pkg/service/endpoints_controller_test.go index 21d28a8dbd9..515cb34ade9 100644 --- a/pkg/service/endpoints_controller_test.go +++ b/pkg/service/endpoints_controller_test.go @@ -408,8 +408,15 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { Name: "foo", ResourceVersion: "1", }, - Protocol: api.ProtocolTCP, - Endpoints: []api.Endpoint{{IP: "1.2.3.4", Port: 8080}}, + Protocol: api.ProtocolTCP, + Endpoints: []api.Endpoint{{ + IP: "1.2.3.4", + Port: 8080, + TargetRef: &api.ObjectReference{ + Kind: "Pod", + Name: "pod0", + }, + }}, }) endpointsHandler.ValidateRequest(t, "/api/"+testapi.Version()+"/endpoints/foo?namespace=other", "PUT", &data) } @@ -449,8 +456,15 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { Name: "foo", ResourceVersion: "1", }, - Protocol: api.ProtocolTCP, - Endpoints: []api.Endpoint{{IP: "1.2.3.4", Port: 8080}}, + Protocol: api.ProtocolTCP, + Endpoints: []api.Endpoint{{ + IP: "1.2.3.4", + Port: 8080, + TargetRef: &api.ObjectReference{ + Kind: "Pod", + Name: "pod0", + }, + }}, }) endpointsHandler.ValidateRequest(t, "/api/"+testapi.Version()+"/endpoints/foo?namespace=bar", "PUT", &data) } @@ -475,8 +489,15 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) { ObjectMeta: api.ObjectMeta{ ResourceVersion: "1", }, - Protocol: api.ProtocolTCP, - Endpoints: []api.Endpoint{{IP: "1.2.3.4", Port: 8080}}, + Protocol: api.ProtocolTCP, + Endpoints: []api.Endpoint{{ + IP: "1.2.3.4", + Port: 8080, + TargetRef: &api.ObjectReference{ + Kind: "Pod", + Name: "pod0", + }, + }}, }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) @@ -514,8 +535,15 @@ func TestSyncEndpointsItems(t *testing.T) { ObjectMeta: api.ObjectMeta{ ResourceVersion: "", }, - Protocol: api.ProtocolTCP, - Endpoints: []api.Endpoint{{IP: "1.2.3.4", Port: 8080}}, + Protocol: api.ProtocolTCP, + Endpoints: []api.Endpoint{{ + IP: "1.2.3.4", + Port: 8080, + TargetRef: &api.ObjectReference{ + Kind: "Pod", + Name: "pod0", + }, + }}, }) endpointsHandler.ValidateRequest(t, "/api/"+testapi.Version()+"/endpoints?namespace=other", "POST", &data) }