From 17c82606eb7868e40eddc9846ca436d883f2cf5f Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Fri, 26 Sep 2014 13:34:55 -0700 Subject: [PATCH] Second step in decoupling the service controller, use the apiserver for writes too. --- pkg/client/client.go | 20 +++++ pkg/registry/endpoint/rest.go | 35 +++++++- pkg/service/endpoints_controller.go | 59 ++++++++++-- pkg/service/endpoints_controller_test.go | 110 ++++++++++++++++++++--- 4 files changed, 201 insertions(+), 23 deletions(-) diff --git a/pkg/client/client.go b/pkg/client/client.go index 0ca6da7bad1..94c1a143946 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -448,6 +448,26 @@ func (c *Client) WatchEndpoints(label, field labels.Selector, resourceVersion ui Watch() } +func (c *Client) CreateEndpoints(endpoints *api.Endpoints) (*api.Endpoints, error) { + result := &api.Endpoints{} + err := c.Post().Path("endpoints").Body(endpoints).Do().Into(result) + return result, err +} + +func (c *Client) UpdateEndpoints(endpoints *api.Endpoints) (*api.Endpoints, error) { + result := &api.Endpoints{} + if endpoints.ResourceVersion == 0 { + return nil, fmt.Errorf("invalid update object, missing resource version: %v", endpoints) + } + err := c.Put(). + Path("endpoints"). + Path(endpoints.ID). + Body(endpoints). + Do(). + Into(result) + return result, err +} + // ServerVersion retrieves and parses the server's version. func (c *Client) ServerVersion() (*version.Info, error) { body, err := c.Get().AbsPath("/version").Do().Raw() diff --git a/pkg/registry/endpoint/rest.go b/pkg/registry/endpoint/rest.go index 2ea28e540c9..ad8068b5d62 100644 --- a/pkg/registry/endpoint/rest.go +++ b/pkg/registry/endpoint/rest.go @@ -18,10 +18,13 @@ package endpoint import ( "errors" + "fmt" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) @@ -56,14 +59,38 @@ func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVer return rs.registry.WatchEndpoints(ctx, label, field, resourceVersion) } -// Create satisfies the RESTStorage interface but is unimplemented. +// Create satisfies the RESTStorage interface. func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan runtime.Object, error) { - return nil, errors.New("unimplemented") + endpoints, ok := obj.(*api.Endpoints) + if !ok { + return nil, fmt.Errorf("not an endpoints: %#v", obj) + } + if len(endpoints.ID) == 0 { + return nil, fmt.Errorf("id is required: %#v", obj) + } + endpoints.CreationTimestamp = util.Now() + return apiserver.MakeAsync(func() (runtime.Object, error) { + err := rs.registry.UpdateEndpoints(ctx, endpoints) + if err != nil { + return nil, err + } + return rs.registry.GetEndpoints(ctx, endpoints.ID) + }), nil } -// Update satisfies the RESTStorage interface but is unimplemented. +// Update satisfies the RESTStorage interface. func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan runtime.Object, error) { - return nil, errors.New("unimplemented") + endpoints, ok := obj.(*api.Endpoints) + if !ok { + return nil, fmt.Errorf("not an endpoints: %#v", obj) + } + return apiserver.MakeAsync(func() (runtime.Object, error) { + err := rs.registry.UpdateEndpoints(ctx, endpoints) + if err != nil { + return nil, err + } + return rs.registry.GetEndpoints(ctx, endpoints.ID) + }), nil } // Delete satisfies the RESTStorage interface but is unimplemented. diff --git a/pkg/service/endpoints_controller.go b/pkg/service/endpoints_controller.go index 40af7c0ad4e..acfac20e252 100644 --- a/pkg/service/endpoints_controller.go +++ b/pkg/service/endpoints_controller.go @@ -20,6 +20,7 @@ import ( "fmt" "net" "strconv" + "strings" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" @@ -72,11 +73,35 @@ func (e *EndpointController) SyncServiceEndpoints() error { } endpoints[ix] = net.JoinHostPort(pod.CurrentState.PodIP, strconv.Itoa(port)) } - // TODO: this is totally broken, we need to compute this and store inside an AtomicUpdate loop. - err = e.serviceRegistry.UpdateEndpoints(api.NewContext(), &api.Endpoints{ - JSONBase: api.JSONBase{ID: service.ID}, - Endpoints: endpoints, - }) + currentEndpoints, err := e.client.GetEndpoints(service.ID) + if err != nil { + // TODO this is brittle as all get out, refactor the client libraries to return a structured error. + if strings.Contains(err.Error(), "(404)") { + currentEndpoints = &api.Endpoints{ + JSONBase: api.JSONBase{ + ID: service.ID, + }, + } + } else { + glog.Errorf("Error getting endpoints: %#v", err) + continue + } + } + newEndpoints := &api.Endpoints{} + *newEndpoints = *currentEndpoints + newEndpoints.Endpoints = endpoints + + if currentEndpoints.ResourceVersion == 0 { + // No previous endpoints, create them + _, err = e.client.CreateEndpoints(newEndpoints) + } else { + // Pre-existing + if endpointsEqual(currentEndpoints, endpoints) { + glog.V(2).Infof("endpoints are equal for %s, skipping update", service.ID) + continue + } + _, err = e.client.UpdateEndpoints(newEndpoints) + } if err != nil { glog.Errorf("Error updating endpoints: %#v", err) continue @@ -85,6 +110,30 @@ func (e *EndpointController) SyncServiceEndpoints() error { return resultErr } +func containsEndpoint(endpoints *api.Endpoints, endpoint string) bool { + if endpoints == nil { + return false + } + for ix := range endpoints.Endpoints { + if endpoints.Endpoints[ix] == endpoint { + return true + } + } + return false +} + +func endpointsEqual(e *api.Endpoints, endpoints []string) bool { + if len(e.Endpoints) != len(endpoints) { + return false + } + for _, endpoint := range endpoints { + if !containsEndpoint(e, endpoint) { + return false + } + } + return true +} + // findPort locates the container port for the given manifest and portName. func findPort(manifest *api.ContainerManifest, portName util.IntOrString) (int, error) { if ((portName.Kind == util.IntstrString && len(portName.StrVal) == 0) || diff --git a/pkg/service/endpoints_controller_test.go b/pkg/service/endpoints_controller_test.go index 55c71a09055..832277441ee 100644 --- a/pkg/service/endpoints_controller_test.go +++ b/pkg/service/endpoints_controller_test.go @@ -24,8 +24,10 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" _ "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) @@ -127,7 +129,7 @@ type serverResponse struct { obj interface{} } -func makeTestServer(t *testing.T, podResponse serverResponse, serviceResponse serverResponse) *httptest.Server { +func makeTestServer(t *testing.T, podResponse serverResponse, serviceResponse serverResponse, endpointsResponse serverResponse) (*httptest.Server, *util.FakeHandler) { fakePodHandler := util.FakeHandler{ StatusCode: podResponse.statusCode, ResponseBody: util.EncodeJSON(podResponse.obj), @@ -136,20 +138,27 @@ func makeTestServer(t *testing.T, podResponse serverResponse, serviceResponse se StatusCode: serviceResponse.statusCode, ResponseBody: util.EncodeJSON(serviceResponse.obj), } + fakeEndpointsHandler := util.FakeHandler{ + StatusCode: endpointsResponse.statusCode, + ResponseBody: util.EncodeJSON(endpointsResponse.obj), + } mux := http.NewServeMux() mux.Handle("/api/v1beta1/pods", &fakePodHandler) mux.Handle("/api/v1beta1/services", &fakeServiceHandler) + mux.Handle("/api/v1beta1/endpoints", &fakeEndpointsHandler) + mux.Handle("/api/v1beta1/endpoints/", &fakeEndpointsHandler) mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) { t.Errorf("unexpected request: %v", req.RequestURI) res.WriteHeader(http.StatusNotFound) }) - return httptest.NewServer(mux) + return httptest.NewServer(mux), &fakeEndpointsHandler } func TestSyncEndpointsEmpty(t *testing.T) { - testServer := makeTestServer(t, + testServer, _ := makeTestServer(t, serverResponse{http.StatusOK, newPodList(0)}, - serverResponse{http.StatusOK, api.ServiceList{}}) + serverResponse{http.StatusOK, api.ServiceList{}}, + serverResponse{http.StatusOK, api.Endpoints{}}) client := client.NewOrDie(testServer.URL, "v1beta1", nil) serviceRegistry := registrytest.ServiceRegistry{} endpoints := NewEndpointController(&serviceRegistry, client) @@ -159,9 +168,10 @@ func TestSyncEndpointsEmpty(t *testing.T) { } func TestSyncEndpointsError(t *testing.T) { - testServer := makeTestServer(t, + testServer, _ := makeTestServer(t, serverResponse{http.StatusOK, newPodList(0)}, - serverResponse{http.StatusInternalServerError, api.ServiceList{}}) + serverResponse{http.StatusInternalServerError, api.ServiceList{}}, + serverResponse{http.StatusOK, api.Endpoints{}}) client := client.NewOrDie(testServer.URL, "v1beta1", nil) serviceRegistry := registrytest.ServiceRegistry{ Err: fmt.Errorf("test error"), @@ -172,29 +182,100 @@ func TestSyncEndpointsError(t *testing.T) { } } -func TestSyncEndpointsItems(t *testing.T) { +func TestSyncEndpointsItemsPreexisting(t *testing.T) { serviceList := api.ServiceList{ Items: []api.Service{ { + JSONBase: api.JSONBase{ID: "foo"}, Selector: map[string]string{ "foo": "bar", }, }, }, } - testServer := makeTestServer(t, + testServer, endpointsHandler := makeTestServer(t, serverResponse{http.StatusOK, newPodList(1)}, - serverResponse{http.StatusOK, serviceList}) + serverResponse{http.StatusOK, serviceList}, + serverResponse{http.StatusOK, api.Endpoints{ + JSONBase: api.JSONBase{ + ID: "foo", + ResourceVersion: 1, + }, + Endpoints: []string{"6.7.8.9:1000"}, + }}) client := client.NewOrDie(testServer.URL, "v1beta1", nil) serviceRegistry := registrytest.ServiceRegistry{} endpoints := NewEndpointController(&serviceRegistry, client) if err := endpoints.SyncServiceEndpoints(); err != nil { t.Errorf("unexpected error: %v", err) } - if len(serviceRegistry.Endpoints.Endpoints) != 1 || - serviceRegistry.Endpoints.Endpoints[0] != "1.2.3.4:8080" { - t.Errorf("Unexpected endpoints update: %#v", serviceRegistry.Endpoints) + data := runtime.EncodeOrDie(v1beta1.Codec, &api.Endpoints{ + JSONBase: api.JSONBase{ + ID: "foo", + ResourceVersion: 1, + }, + Endpoints: []string{"1.2.3.4:8080"}, + }) + endpointsHandler.ValidateRequest(t, "/api/v1beta1/endpoints/foo", "PUT", &data) +} + +func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) { + serviceList := api.ServiceList{ + Items: []api.Service{ + { + JSONBase: api.JSONBase{ID: "foo"}, + Selector: map[string]string{ + "foo": "bar", + }, + }, + }, } + testServer, endpointsHandler := makeTestServer(t, + serverResponse{http.StatusOK, newPodList(1)}, + serverResponse{http.StatusOK, serviceList}, + serverResponse{http.StatusOK, api.Endpoints{ + JSONBase: api.JSONBase{ + ResourceVersion: 1, + }, + Endpoints: []string{"1.2.3.4:8080"}, + }}) + client := client.NewOrDie(testServer.URL, "v1beta1", nil) + serviceRegistry := registrytest.ServiceRegistry{} + endpoints := NewEndpointController(&serviceRegistry, client) + if err := endpoints.SyncServiceEndpoints(); err != nil { + t.Errorf("unexpected error: %v", err) + } + endpointsHandler.ValidateRequest(t, "/api/v1beta1/endpoints/foo", "GET", nil) +} + +func TestSyncEndpointsItems(t *testing.T) { + serviceList := api.ServiceList{ + Items: []api.Service{ + { + JSONBase: api.JSONBase{ID: "foo"}, + Selector: map[string]string{ + "foo": "bar", + }, + }, + }, + } + testServer, endpointsHandler := makeTestServer(t, + serverResponse{http.StatusOK, newPodList(1)}, + serverResponse{http.StatusOK, serviceList}, + serverResponse{http.StatusOK, api.Endpoints{}}) + client := client.NewOrDie(testServer.URL, "v1beta1", nil) + serviceRegistry := registrytest.ServiceRegistry{} + endpoints := NewEndpointController(&serviceRegistry, client) + if err := endpoints.SyncServiceEndpoints(); err != nil { + t.Errorf("unexpected error: %v", err) + } + data := runtime.EncodeOrDie(v1beta1.Codec, &api.Endpoints{ + JSONBase: api.JSONBase{ + ResourceVersion: 0, + }, + Endpoints: []string{"1.2.3.4:8080"}, + }) + endpointsHandler.ValidateRequest(t, "/api/v1beta1/endpoints", "POST", &data) } func TestSyncEndpointsPodError(t *testing.T) { @@ -207,9 +288,10 @@ func TestSyncEndpointsPodError(t *testing.T) { }, }, } - testServer := makeTestServer(t, + testServer, _ := makeTestServer(t, serverResponse{http.StatusInternalServerError, api.PodList{}}, - serverResponse{http.StatusOK, serviceList}) + serverResponse{http.StatusOK, serviceList}, + serverResponse{http.StatusOK, api.Endpoints{}}) client := client.NewOrDie(testServer.URL, "v1beta1", nil) serviceRegistry := registrytest.ServiceRegistry{ List: api.ServiceList{