From d4e1076306b3bb71aafbafe8e1fd0efa7346eaeb Mon Sep 17 00:00:00 2001 From: derekwaynecarr Date: Thu, 16 Oct 2014 17:16:40 -0400 Subject: [PATCH] Fix etcd in proxy for namespace awareness --- pkg/proxy/config/etcd.go | 81 +++++++++++++++++++++++++--------------- 1 file changed, 51 insertions(+), 30 deletions(-) diff --git a/pkg/proxy/config/etcd.go b/pkg/proxy/config/etcd.go index 15efb2371ca..012465f686b 100644 --- a/pkg/proxy/config/etcd.go +++ b/pkg/proxy/config/etcd.go @@ -116,10 +116,50 @@ func (s ConfigSourceEtcd) Run() { } } +// decodeServices recurses from the root of the service storage directory into each namespace to get each service and endpoint object +func (s ConfigSourceEtcd) decodeServices(node *etcd.Node, retServices []api.Service, retEndpoints []api.Endpoints) ([]api.Service, []api.Endpoints, error) { + // TODO this needs to go against API server desperately, so much redundant error prone code here + // we hit a namespace boundary, recurse until we find actual nodes + if node.Dir == true { + for _, n := range node.Nodes { + var err error // Don't shadow the ret* variables. + retServices, retEndpoints, err = s.decodeServices(n, retServices, retEndpoints) + if err != nil { + return retServices, retEndpoints, err + } + } + return retServices, retEndpoints, nil + } + + // we have an actual service node + var svc api.Service + err := latest.Codec.DecodeInto([]byte(node.Value), &svc) + if err != nil { + glog.Errorf("Failed to load Service: %s (%#v)", node.Value, err) + } else { + // so we got a service we can handle, and now get endpoints + retServices = append(retServices, svc) + // get the endpoints + endpoints, err := s.GetEndpoints(svc.Namespace, svc.ID) + if err != nil { + if tools.IsEtcdNotFound(err) { + glog.V(4).Infof("Unable to get endpoints for %s %s : %v", svc.Namespace, svc.ID, err) + } + glog.Errorf("Couldn't get endpoints for %s %s : %v skipping", svc.Namespace, svc.ID, err) + endpoints = api.Endpoints{} + } else { + glog.V(3).Infof("Got service: %s %s on localport %d mapping to: %s", svc.Namespace, svc.ID, svc.Port, endpoints) + } + retEndpoints = append(retEndpoints, endpoints) + } + return retServices, retEndpoints, nil +} + // GetServices finds the list of services and their endpoints from etcd. // This operation is akin to a set a known good at regular intervals. func (s ConfigSourceEtcd) GetServices() ([]api.Service, []api.Endpoints, error) { - response, err := s.client.Get(registryRoot+"/specs", true, false) + // this is a recursive query now that services are namespaced under "/registry/services/specs//" + response, err := s.client.Get(registryRoot+"/specs", false, true) if err != nil { if tools.IsEtcdNotFound(err) { glog.V(4).Infof("Failed to get the key %s: %v", registryRoot, err) @@ -128,40 +168,18 @@ func (s ConfigSourceEtcd) GetServices() ([]api.Service, []api.Endpoints, error) } return []api.Service{}, []api.Endpoints{}, err } + // this code needs to go through the API server in the future, this is one big hack if response.Node.Dir == true { - retServices := make([]api.Service, len(response.Node.Nodes)) - retEndpoints := make([]api.Endpoints, len(response.Node.Nodes)) - // Ok, so we have directories, this list should be the list - // of services. Find the local port to listen on and remote endpoints - // and create a Service entry for it. - for i, node := range response.Node.Nodes { - var svc api.Service - err = latest.Codec.DecodeInto([]byte(node.Value), &svc) - if err != nil { - glog.Errorf("Failed to load Service: %s (%#v)", node.Value, err) - continue - } - retServices[i] = svc - endpoints, err := s.GetEndpoints(svc.ID) - if err != nil { - if tools.IsEtcdNotFound(err) { - glog.V(4).Infof("Unable to get endpoints for %s : %v", svc.ID, err) - } - glog.Errorf("Couldn't get endpoints for %s : %v skipping", svc.ID, err) - endpoints = api.Endpoints{} - } else { - glog.V(3).Infof("Got service: %s on localport %d mapping to: %s", svc.ID, svc.Port, endpoints) - } - retEndpoints[i] = endpoints - } - return retServices, retEndpoints, err + retServices := []api.Service{} + retEndpoints := []api.Endpoints{} + return s.decodeServices(response.Node, retServices, retEndpoints) } return nil, nil, fmt.Errorf("did not get the root of the registry %s", registryRoot) } // GetEndpoints finds the list of endpoints of the service from etcd. -func (s ConfigSourceEtcd) GetEndpoints(service string) (api.Endpoints, error) { - key := path.Join(registryRoot, "endpoints", service) +func (s ConfigSourceEtcd) GetEndpoints(namespace, service string) (api.Endpoints, error) { + key := path.Join(registryRoot, "endpoints", namespace, service) response, err := s.client.Get(key, true, false) if err != nil { glog.Errorf("Failed to get the key: %s %v", key, err) @@ -195,7 +213,10 @@ func (s ConfigSourceEtcd) WatchForChanges() { if !ok { break } - s.ProcessChange(watchResponse) + // only listen for non directory changes + if watchResponse.Node.Dir == false { + s.ProcessChange(watchResponse) + } } }