All types stored in etcd are now API objects.

This means I made an api.ContainerManifestList, and added a JSONBase to
endpoints (and changed Name -> JSONBase.ID).
This commit is contained in:
Daniel Smith 2014-07-22 18:53:41 -07:00
parent f1d6069b01
commit b3cc696486
15 changed files with 167 additions and 151 deletions

View File

@ -44,6 +44,8 @@ func init() {
Status{}, Status{},
ServerOpList{}, ServerOpList{},
ServerOp{}, ServerOp{},
ContainerManifestList{},
Endpoints{},
) )
AddKnownTypes("v1beta1", AddKnownTypes("v1beta1",
v1beta1.PodList{}, v1beta1.PodList{},
@ -106,6 +108,15 @@ func FindJSONBaseRO(obj interface{}) (JSONBase, error) {
return jsonBase.Interface().(JSONBase), nil return jsonBase.Interface().(JSONBase), nil
} }
// EncodeOrDie is a version of Encode which will panic instead of returning an error. For tests.
func EncodeOrDie(obj interface{}) string {
bytes, err := Encode(obj)
if err != nil {
panic(err)
}
return string(bytes)
}
// Encode turns the given api object into an appropriate JSON string. // Encode turns the given api object into an appropriate JSON string.
// Will return an error if the object doesn't have an embedded JSONBase. // Will return an error if the object doesn't have an embedded JSONBase.
// Obj may be a pointer to a struct, or a struct. If a struct, a copy // Obj may be a pointer to a struct, or a struct. If a struct, a copy

View File

@ -58,6 +58,12 @@ type ContainerManifest struct {
Containers []Container `yaml:"containers" json:"containers"` Containers []Container `yaml:"containers" json:"containers"`
} }
// ContainerManifestList is used to communicate container manifests to kubelet.
type ContainerManifestList struct {
JSONBase `json:",inline" yaml:",inline"`
Items []ContainerManifest `json:"items,omitempty" yaml:"items,omitempty"`
}
// Volume represents a named volume in a pod that may be accessed by any containers in the pod. // Volume represents a named volume in a pod that may be accessed by any containers in the pod.
type Volume struct { type Volume struct {
// Required: This must be a DNS_LABEL. Each volume in a pod must have // Required: This must be a DNS_LABEL. Each volume in a pod must have
@ -289,8 +295,8 @@ type Service struct {
// Endpoints is a collection of endpoints that implement the actual service, for example: // Endpoints is a collection of endpoints that implement the actual service, for example:
// Name: "mysql", Endpoints: ["10.10.1.1:1909", "10.10.2.2:8834"] // Name: "mysql", Endpoints: ["10.10.1.1:1909", "10.10.2.2:8834"]
type Endpoints struct { type Endpoints struct {
Name string JSONBase `json:",inline" yaml:",inline"`
Endpoints []string Endpoints []string `json:"endpoints,omitempty" yaml:"endpoints,omitempty"`
} }
// Minion is a worker node in Kubernetenes. // Minion is a worker node in Kubernetenes.

View File

@ -9,7 +9,7 @@ You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or sied. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
@ -110,12 +110,12 @@ func responseToPods(response *etcd.Response) ([]kubelet.Pod, error) {
return pods, fmt.Errorf("no nodes field: %v", response) return pods, fmt.Errorf("no nodes field: %v", response)
} }
manifests := []api.ContainerManifest{} manifests := api.ContainerManifestList{}
if err := yaml.Unmarshal([]byte(response.Node.Value), &manifests); err != nil { if err := yaml.Unmarshal([]byte(response.Node.Value), &manifests); err != nil {
return pods, fmt.Errorf("could not unmarshal manifests: %v", err) return pods, fmt.Errorf("could not unmarshal manifests: %v", err)
} }
for i, manifest := range manifests { for i, manifest := range manifests.Items {
name := manifest.ID name := manifest.ID
if name == "" { if name == "" {
name = fmt.Sprintf("_%d", i+1) name = fmt.Sprintf("_%d", i+1)

View File

@ -24,7 +24,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd" "github.com/coreos/go-etcd/etcd"
) )
@ -36,7 +35,9 @@ func TestGetEtcdData(t *testing.T) {
fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{ fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
Node: &etcd.Node{ Node: &etcd.Node{
Value: util.MakeJSONString([]api.ContainerManifest{api.ContainerManifest{ID: "foo"}}), Value: api.EncodeOrDie(&api.ContainerManifestList{
Items: []api.ContainerManifest{{ID: "foo"}},
}),
ModifiedIndex: 1, ModifiedIndex: 1,
}, },
}, },
@ -76,7 +77,9 @@ func TestGetEtcd(t *testing.T) {
fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{ fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
Node: &etcd.Node{ Node: &etcd.Node{
Value: util.MakeJSONString([]api.ContainerManifest{api.ContainerManifest{ID: "foo"}}), Value: api.EncodeOrDie(&api.ContainerManifestList{
Items: []api.ContainerManifest{{ID: "foo"}},
}),
ModifiedIndex: 1, ModifiedIndex: 1,
}, },
}, },
@ -103,7 +106,7 @@ func TestWatchEtcd(t *testing.T) {
fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{ fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
Node: &etcd.Node{ Node: &etcd.Node{
Value: util.MakeJSONString([]api.Container{}), Value: api.EncodeOrDie(&api.ContainerManifestList{}),
ModifiedIndex: 2, ModifiedIndex: 2,
}, },
}, },

View File

@ -127,19 +127,19 @@ func (s *endpointsStore) Merge(source string, change interface{}) error {
case ADD: case ADD:
glog.Infof("Adding new endpoint from source %s : %v", source, update.Endpoints) glog.Infof("Adding new endpoint from source %s : %v", source, update.Endpoints)
for _, value := range update.Endpoints { for _, value := range update.Endpoints {
endpoints[value.Name] = value endpoints[value.ID] = value
} }
case REMOVE: case REMOVE:
glog.Infof("Removing an endpoint %v", update) glog.Infof("Removing an endpoint %v", update)
for _, value := range update.Endpoints { for _, value := range update.Endpoints {
delete(endpoints, value.Name) delete(endpoints, value.ID)
} }
case SET: case SET:
glog.Infof("Setting endpoints %v", update) glog.Infof("Setting endpoints %v", update)
// Clear the old map entries by just creating a new map // Clear the old map entries by just creating a new map
endpoints = make(map[string]api.Endpoints) endpoints = make(map[string]api.Endpoints)
for _, value := range update.Endpoints { for _, value := range update.Endpoints {
endpoints[value.Name] = value endpoints[value.ID] = value
} }
default: default:
glog.Infof("Received invalid update type: %v", update) glog.Infof("Received invalid update type: %v", update)

View File

@ -83,7 +83,7 @@ func (s sortedEndpoints) Swap(i, j int) {
s[i], s[j] = s[j], s[i] s[i], s[j] = s[j], s[i]
} }
func (s sortedEndpoints) Less(i, j int) bool { func (s sortedEndpoints) Less(i, j int) bool {
return s[i].Name < s[j].Name return s[i].ID < s[j].ID
} }
type EndpointsHandlerMock struct { type EndpointsHandlerMock struct {
@ -216,8 +216,14 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddedAndNotified(t *testing.
handler2 := NewEndpointsHandlerMock() handler2 := NewEndpointsHandlerMock()
config.RegisterHandler(handler) config.RegisterHandler(handler)
config.RegisterHandler(handler2) config.RegisterHandler(handler2)
endpointsUpdate1 := CreateEndpointsUpdate(ADD, api.Endpoints{Name: "foo", Endpoints: []string{"endpoint1", "endpoint2"}}) endpointsUpdate1 := CreateEndpointsUpdate(ADD, api.Endpoints{
endpointsUpdate2 := CreateEndpointsUpdate(ADD, api.Endpoints{Name: "bar", Endpoints: []string{"endpoint3", "endpoint4"}}) JSONBase: api.JSONBase{ID: "foo"},
Endpoints: []string{"endpoint1", "endpoint2"},
})
endpointsUpdate2 := CreateEndpointsUpdate(ADD, api.Endpoints{
JSONBase: api.JSONBase{ID: "bar"},
Endpoints: []string{"endpoint3", "endpoint4"},
})
handler.Wait(2) handler.Wait(2)
handler2.Wait(2) handler2.Wait(2)
channelOne <- endpointsUpdate1 channelOne <- endpointsUpdate1
@ -236,8 +242,14 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t
handler2 := NewEndpointsHandlerMock() handler2 := NewEndpointsHandlerMock()
config.RegisterHandler(handler) config.RegisterHandler(handler)
config.RegisterHandler(handler2) config.RegisterHandler(handler2)
endpointsUpdate1 := CreateEndpointsUpdate(ADD, api.Endpoints{Name: "foo", Endpoints: []string{"endpoint1", "endpoint2"}}) endpointsUpdate1 := CreateEndpointsUpdate(ADD, api.Endpoints{
endpointsUpdate2 := CreateEndpointsUpdate(ADD, api.Endpoints{Name: "bar", Endpoints: []string{"endpoint3", "endpoint4"}}) JSONBase: api.JSONBase{ID: "foo"},
Endpoints: []string{"endpoint1", "endpoint2"},
})
endpointsUpdate2 := CreateEndpointsUpdate(ADD, api.Endpoints{
JSONBase: api.JSONBase{ID: "bar"},
Endpoints: []string{"endpoint3", "endpoint4"},
})
handler.Wait(2) handler.Wait(2)
handler2.Wait(2) handler2.Wait(2)
channelOne <- endpointsUpdate1 channelOne <- endpointsUpdate1
@ -248,7 +260,10 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t
handler2.ValidateEndpoints(t, endpoints) handler2.ValidateEndpoints(t, endpoints)
// Add one more // Add one more
endpointsUpdate3 := CreateEndpointsUpdate(ADD, api.Endpoints{Name: "foobar", Endpoints: []string{"endpoint5", "endpoint6"}}) endpointsUpdate3 := CreateEndpointsUpdate(ADD, api.Endpoints{
JSONBase: api.JSONBase{ID: "foobar"},
Endpoints: []string{"endpoint5", "endpoint6"},
})
handler.Wait(1) handler.Wait(1)
handler2.Wait(1) handler2.Wait(1)
channelTwo <- endpointsUpdate3 channelTwo <- endpointsUpdate3
@ -257,7 +272,10 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t
handler2.ValidateEndpoints(t, endpoints) handler2.ValidateEndpoints(t, endpoints)
// Update the "foo" service with new endpoints // Update the "foo" service with new endpoints
endpointsUpdate1 = CreateEndpointsUpdate(ADD, api.Endpoints{Name: "foo", Endpoints: []string{"endpoint77"}}) endpointsUpdate1 = CreateEndpointsUpdate(ADD, api.Endpoints{
JSONBase: api.JSONBase{ID: "foo"},
Endpoints: []string{"endpoint77"},
})
handler.Wait(1) handler.Wait(1)
handler2.Wait(1) handler2.Wait(1)
channelOne <- endpointsUpdate1 channelOne <- endpointsUpdate1
@ -266,7 +284,7 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t
handler2.ValidateEndpoints(t, endpoints) handler2.ValidateEndpoints(t, endpoints)
// Remove "bar" service // Remove "bar" service
endpointsUpdate2 = CreateEndpointsUpdate(REMOVE, api.Endpoints{Name: "bar"}) endpointsUpdate2 = CreateEndpointsUpdate(REMOVE, api.Endpoints{JSONBase: api.JSONBase{ID: "bar"}})
handler.Wait(1) handler.Wait(1)
handler2.Wait(1) handler2.Wait(1)
channelTwo <- endpointsUpdate2 channelTwo <- endpointsUpdate2

View File

@ -34,7 +34,6 @@ limitations under the License.
package config package config
import ( import (
"encoding/json"
"fmt" "fmt"
"strings" "strings"
"time" "time"
@ -127,7 +126,7 @@ func (s ConfigSourceEtcd) GetServices() ([]api.Service, []api.Endpoints, error)
// and create a Service entry for it. // and create a Service entry for it.
for i, node := range response.Node.Nodes { for i, node := range response.Node.Nodes {
var svc api.Service var svc api.Service
err = json.Unmarshal([]byte(node.Value), &svc) err = api.DecodeInto([]byte(node.Value), &svc)
if err != nil { if err != nil {
glog.Errorf("Failed to load Service: %s (%#v)", node.Value, err) glog.Errorf("Failed to load Service: %s (%#v)", node.Value, err)
continue continue
@ -154,7 +153,9 @@ func (s ConfigSourceEtcd) GetEndpoints(service string) (api.Endpoints, error) {
return api.Endpoints{}, err return api.Endpoints{}, err
} }
// Parse all the endpoint specifications in this value. // Parse all the endpoint specifications in this value.
return parseEndpoints(response.Node.Value) var e api.Endpoints
err = api.DecodeInto([]byte(response.Node.Value), &e)
return e, err
} }
// etcdResponseToService takes an etcd response and pulls it apart to find service. // etcdResponseToService takes an etcd response and pulls it apart to find service.
@ -163,19 +164,13 @@ func etcdResponseToService(response *etcd.Response) (*api.Service, error) {
return nil, fmt.Errorf("invalid response from etcd: %#v", response) return nil, fmt.Errorf("invalid response from etcd: %#v", response)
} }
var svc api.Service var svc api.Service
err := json.Unmarshal([]byte(response.Node.Value), &svc) err := api.DecodeInto([]byte(response.Node.Value), &svc)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &svc, err return &svc, err
} }
func parseEndpoints(jsonString string) (api.Endpoints, error) {
var e api.Endpoints
err := json.Unmarshal([]byte(jsonString), &e)
return e, err
}
func (s ConfigSourceEtcd) WatchForChanges() { func (s ConfigSourceEtcd) WatchForChanges() {
glog.Info("Setting up a watch for new services") glog.Info("Setting up a watch for new services")
watchChannel := make(chan *etcd.Response) watchChannel := make(chan *etcd.Response)
@ -220,7 +215,7 @@ func (s ConfigSourceEtcd) ProcessChange(response *etcd.Response) {
func (s ConfigSourceEtcd) ProcessEndpointResponse(response *etcd.Response) { func (s ConfigSourceEtcd) ProcessEndpointResponse(response *etcd.Response) {
glog.Infof("Processing a change in endpoint configuration... %s", *response) glog.Infof("Processing a change in endpoint configuration... %s", *response)
var endpoints api.Endpoints var endpoints api.Endpoints
err := json.Unmarshal([]byte(response.Node.Value), &endpoints) err := api.DecodeInto([]byte(response.Node.Value), &endpoints)
if err != nil { if err != nil {
glog.Errorf("Failed to parse service out of etcd key: %v : %+v", response.Node.Value, err) glog.Errorf("Failed to parse service out of etcd key: %v : %+v", response.Node.Value, err)
return return

View File

@ -1,56 +0,0 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"encoding/json"
"reflect"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
const TomcatContainerEtcdKey = "/registry/services/tomcat/endpoints/tomcat-3bd5af34"
const TomcatService = "tomcat"
const TomcatContainerID = "tomcat-3bd5af34"
func validateJSONParsing(t *testing.T, jsonString string, expectedEndpoints api.Endpoints, expectError bool) {
endpoints, err := parseEndpoints(jsonString)
if err == nil && expectError {
t.Errorf("validateJSONParsing did not get expected error when parsing %s", jsonString)
}
if err != nil && !expectError {
t.Errorf("validateJSONParsing got unexpected error %+v when parsing %s", err, jsonString)
}
if !reflect.DeepEqual(expectedEndpoints, endpoints) {
t.Errorf("Didn't get expected endpoints %+v got: %+v", expectedEndpoints, endpoints)
}
}
func TestParseJsonEndpoints(t *testing.T) {
validateJSONParsing(t, "", api.Endpoints{}, true)
endpoints := api.Endpoints{
Name: "foo",
Endpoints: []string{"foo", "bar", "baz"},
}
data, err := json.Marshal(endpoints)
if err != nil {
t.Errorf("Unexpected error: %#v", err)
}
validateJSONParsing(t, string(data), endpoints, false)
// validateJSONParsing(t, "[{\"port\":8000,\"name\":\"mysql\",\"machine\":\"foo\"},{\"port\":9000,\"name\":\"mysql\",\"machine\":\"bar\"}]", []string{"foo:8000", "bar:9000"}, false)
}

View File

@ -102,7 +102,7 @@ func (s ConfigSourceFile) Run() {
newEndpoints := make([]api.Endpoints, len(config.Services)) newEndpoints := make([]api.Endpoints, len(config.Services))
for i, service := range config.Services { for i, service := range config.Services {
newServices[i] = api.Service{JSONBase: api.JSONBase{ID: service.Name}, Port: service.Port} newServices[i] = api.Service{JSONBase: api.JSONBase{ID: service.Name}, Port: service.Port}
newEndpoints[i] = api.Endpoints{Name: service.Name, Endpoints: service.Endpoints} newEndpoints[i] = api.Endpoints{JSONBase: api.JSONBase{ID: service.Name}, Endpoints: service.Endpoints}
} }
if !reflect.DeepEqual(lastServices, newServices) { if !reflect.DeepEqual(lastServices, newServices) {
serviceUpdate := ServiceUpdate{Op: SET, Services: newServices} serviceUpdate := ServiceUpdate{Op: SET, Services: newServices}

View File

@ -52,7 +52,8 @@ func TestProxy(t *testing.T) {
} }
lb := NewLoadBalancerRR() lb := NewLoadBalancerRR()
lb.OnUpdate([]api.Endpoints{{"echo", []string{net.JoinHostPort("127.0.0.1", port)}}}) lb.OnUpdate([]api.Endpoints{
{JSONBase: api.JSONBase{ID: "echo"}, Endpoints: []string{net.JoinHostPort("127.0.0.1", port)}}})
p := NewProxier(lb) p := NewProxier(lb)

View File

@ -89,15 +89,15 @@ func (impl LoadBalancerRR) OnUpdate(endpoints []api.Endpoints) {
defer impl.lock.Unlock() defer impl.lock.Unlock()
// First update / add all new endpoints for services. // First update / add all new endpoints for services.
for _, value := range endpoints { for _, value := range endpoints {
existingEndpoints, exists := impl.endpointsMap[value.Name] existingEndpoints, exists := impl.endpointsMap[value.ID]
validEndpoints := impl.filterValidEndpoints(value.Endpoints) validEndpoints := impl.filterValidEndpoints(value.Endpoints)
if !exists || !reflect.DeepEqual(existingEndpoints, validEndpoints) { if !exists || !reflect.DeepEqual(existingEndpoints, validEndpoints) {
glog.Infof("LoadBalancerRR: Setting endpoints for %s to %+v", value.Name, value.Endpoints) glog.Infof("LoadBalancerRR: Setting endpoints for %s to %+v", value.ID, value.Endpoints)
impl.endpointsMap[value.Name] = validEndpoints impl.endpointsMap[value.ID] = validEndpoints
// Start RR from the beginning if added or updated. // Start RR from the beginning if added or updated.
impl.rrIndex[value.Name] = 0 impl.rrIndex[value.ID] = 0
} }
tmp[value.Name] = true tmp[value.ID] = true
} }
// Then remove any endpoints no longer relevant // Then remove any endpoints no longer relevant
for key, value := range impl.endpointsMap { for key, value := range impl.endpointsMap {

View File

@ -87,7 +87,10 @@ func TestLoadBalanceWorksWithSingleEndpoint(t *testing.T) {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
endpoints := make([]api.Endpoints, 1) endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{Name: "foo", Endpoints: []string{"endpoint1:40"}} endpoints[0] = api.Endpoints{
JSONBase: api.JSONBase{ID: "foo"},
Endpoints: []string{"endpoint1:40"},
}
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
expectEndpoint(t, loadBalancer, "foo", "endpoint1:40") expectEndpoint(t, loadBalancer, "foo", "endpoint1:40")
expectEndpoint(t, loadBalancer, "foo", "endpoint1:40") expectEndpoint(t, loadBalancer, "foo", "endpoint1:40")
@ -102,7 +105,10 @@ func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
endpoints := make([]api.Endpoints, 1) endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{Name: "foo", Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"}} endpoints[0] = api.Endpoints{
JSONBase: api.JSONBase{ID: "foo"},
Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"},
}
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
expectEndpoint(t, loadBalancer, "foo", "endpoint:1") expectEndpoint(t, loadBalancer, "foo", "endpoint:1")
expectEndpoint(t, loadBalancer, "foo", "endpoint:2") expectEndpoint(t, loadBalancer, "foo", "endpoint:2")
@ -117,7 +123,10 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
endpoints := make([]api.Endpoints, 1) endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{Name: "foo", Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"}} endpoints[0] = api.Endpoints{
JSONBase: api.JSONBase{ID: "foo"},
Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"},
}
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
expectEndpoint(t, loadBalancer, "foo", "endpoint:1") expectEndpoint(t, loadBalancer, "foo", "endpoint:1")
expectEndpoint(t, loadBalancer, "foo", "endpoint:2") expectEndpoint(t, loadBalancer, "foo", "endpoint:2")
@ -126,14 +135,16 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
expectEndpoint(t, loadBalancer, "foo", "endpoint:2") expectEndpoint(t, loadBalancer, "foo", "endpoint:2")
// Then update the configuration with one fewer endpoints, make sure // Then update the configuration with one fewer endpoints, make sure
// we start in the beginning again // we start in the beginning again
endpoints[0] = api.Endpoints{Name: "foo", Endpoints: []string{"endpoint:8", "endpoint:9"}} endpoints[0] = api.Endpoints{JSONBase: api.JSONBase{ID: "foo"},
Endpoints: []string{"endpoint:8", "endpoint:9"},
}
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
expectEndpoint(t, loadBalancer, "foo", "endpoint:8") expectEndpoint(t, loadBalancer, "foo", "endpoint:8")
expectEndpoint(t, loadBalancer, "foo", "endpoint:9") expectEndpoint(t, loadBalancer, "foo", "endpoint:9")
expectEndpoint(t, loadBalancer, "foo", "endpoint:8") expectEndpoint(t, loadBalancer, "foo", "endpoint:8")
expectEndpoint(t, loadBalancer, "foo", "endpoint:9") expectEndpoint(t, loadBalancer, "foo", "endpoint:9")
// Clear endpoints // Clear endpoints
endpoints[0] = api.Endpoints{Name: "foo", Endpoints: []string{}} endpoints[0] = api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{}}
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
endpoint, err = loadBalancer.LoadBalance("foo", nil) endpoint, err = loadBalancer.LoadBalance("foo", nil)
@ -149,8 +160,14 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
endpoints := make([]api.Endpoints, 2) endpoints := make([]api.Endpoints, 2)
endpoints[0] = api.Endpoints{Name: "foo", Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"}} endpoints[0] = api.Endpoints{
endpoints[1] = api.Endpoints{Name: "bar", Endpoints: []string{"endpoint:4", "endpoint:5"}} JSONBase: api.JSONBase{ID: "foo"},
Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"},
}
endpoints[1] = api.Endpoints{
JSONBase: api.JSONBase{ID: "bar"},
Endpoints: []string{"endpoint:4", "endpoint:5"},
}
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
expectEndpoint(t, loadBalancer, "foo", "endpoint:1") expectEndpoint(t, loadBalancer, "foo", "endpoint:1")
expectEndpoint(t, loadBalancer, "foo", "endpoint:2") expectEndpoint(t, loadBalancer, "foo", "endpoint:2")

View File

@ -88,7 +88,7 @@ func (e *EndpointController) SyncServiceEndpoints() error {
endpoints[ix] = net.JoinHostPort(pod.CurrentState.PodIP, strconv.Itoa(port)) endpoints[ix] = net.JoinHostPort(pod.CurrentState.PodIP, strconv.Itoa(port))
} }
err = e.serviceRegistry.UpdateEndpoints(api.Endpoints{ err = e.serviceRegistry.UpdateEndpoints(api.Endpoints{
Name: service.ID, JSONBase: api.JSONBase{ID: service.ID},
Endpoints: endpoints, Endpoints: endpoints,
}) })
if err != nil { if err != nil {

View File

@ -112,9 +112,10 @@ func (registry *EtcdRegistry) runPod(pod api.Pod, machine string) error {
} }
contKey := makeContainerKey(machine) contKey := makeContainerKey(machine)
err = registry.helper().AtomicUpdate(contKey, &[]api.ContainerManifest{}, func(in interface{}) (interface{}, error) { err = registry.helper().AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) {
manifests := *in.(*[]api.ContainerManifest) manifests := *in.(*api.ContainerManifestList)
return append(manifests, manifest), nil manifests.Items = append(manifests.Items, manifest)
return manifests, nil
}) })
if err != nil { if err != nil {
// Don't strand stuff. // Don't strand stuff.
@ -153,11 +154,11 @@ func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error
// Next, remove the pod from the machine atomically. // Next, remove the pod from the machine atomically.
contKey := makeContainerKey(machine) contKey := makeContainerKey(machine)
return registry.helper().AtomicUpdate(contKey, &[]api.ContainerManifest{}, func(in interface{}) (interface{}, error) { return registry.helper().AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) {
manifests := *in.(*[]api.ContainerManifest) manifests := in.(*api.ContainerManifestList)
newManifests := make([]api.ContainerManifest, 0, len(manifests)) newManifests := make([]api.ContainerManifest, 0, len(manifests.Items))
found := false found := false
for _, manifest := range manifests { for _, manifest := range manifests.Items {
if manifest.ID != podID { if manifest.ID != podID {
newManifests = append(newManifests, manifest) newManifests = append(newManifests, manifest)
} else { } else {
@ -170,7 +171,8 @@ func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error
// However it is "deleted" so log it and move on // However it is "deleted" so log it and move on
glog.Infof("Couldn't find: %s in %#v", podID, manifests) glog.Infof("Couldn't find: %s in %#v", podID, manifests)
} }
return newManifests, nil manifests.Items = newManifests
return manifests, nil
}) })
} }
@ -297,5 +299,5 @@ func (registry *EtcdRegistry) UpdateService(svc api.Service) error {
// UpdateEndpoints update Endpoints of a Service. // UpdateEndpoints update Endpoints of a Service.
func (registry *EtcdRegistry) UpdateEndpoints(e api.Endpoints) error { func (registry *EtcdRegistry) UpdateEndpoints(e api.Endpoints) error {
return registry.helper().SetObj("/registry/services/endpoints/"+e.Name, e) return registry.helper().SetObj("/registry/services/endpoints/"+e.ID, e)
} }

View File

@ -17,7 +17,6 @@ limitations under the License.
package registry package registry
import ( import (
"encoding/json"
"reflect" "reflect"
"testing" "testing"
@ -70,7 +69,7 @@ func TestEtcdCreatePod(t *testing.T) {
}, },
E: tools.EtcdErrorNotFound, E: tools.EtcdErrorNotFound,
} }
fakeClient.Set("/registry/hosts/machine/kubelet", util.MakeJSONString([]api.ContainerManifest{}), 0) fakeClient.Set("/registry/hosts/machine/kubelet", api.EncodeOrDie(&api.ContainerManifestList{}), 0)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
err := registry.CreatePod("machine", api.Pod{ err := registry.CreatePod("machine", api.Pod{
JSONBase: api.JSONBase{ JSONBase: api.JSONBase{
@ -88,18 +87,20 @@ func TestEtcdCreatePod(t *testing.T) {
}) })
expectNoError(t, err) expectNoError(t, err)
resp, err := fakeClient.Get("/registry/hosts/machine/pods/foo", false, false) resp, err := fakeClient.Get("/registry/hosts/machine/pods/foo", false, false)
expectNoError(t, err) if err != nil {
t.Fatalf("Unexpected error %v", err)
}
var pod api.Pod var pod api.Pod
err = json.Unmarshal([]byte(resp.Node.Value), &pod) err = api.DecodeInto([]byte(resp.Node.Value), &pod)
expectNoError(t, err) expectNoError(t, err)
if pod.ID != "foo" { if pod.ID != "foo" {
t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value) t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value)
} }
var manifests []api.ContainerManifest var manifests api.ContainerManifestList
resp, err = fakeClient.Get("/registry/hosts/machine/kubelet", false, false) resp, err = fakeClient.Get("/registry/hosts/machine/kubelet", false, false)
expectNoError(t, err) expectNoError(t, err)
err = json.Unmarshal([]byte(resp.Node.Value), &manifests) err = api.DecodeInto([]byte(resp.Node.Value), &manifests)
if len(manifests) != 1 || manifests[0].ID != "foo" { if len(manifests.Items) != 1 || manifests.Items[0].ID != "foo" {
t.Errorf("Unexpected manifest list: %#v", manifests) t.Errorf("Unexpected manifest list: %#v", manifests)
} }
} }
@ -189,18 +190,20 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) {
}) })
expectNoError(t, err) expectNoError(t, err)
resp, err := fakeClient.Get("/registry/hosts/machine/pods/foo", false, false) resp, err := fakeClient.Get("/registry/hosts/machine/pods/foo", false, false)
expectNoError(t, err) if err != nil {
t.Fatalf("Unexpected error %v", err)
}
var pod api.Pod var pod api.Pod
err = json.Unmarshal([]byte(resp.Node.Value), &pod) err = api.DecodeInto([]byte(resp.Node.Value), &pod)
expectNoError(t, err) expectNoError(t, err)
if pod.ID != "foo" { if pod.ID != "foo" {
t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value) t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value)
} }
var manifests []api.ContainerManifest var manifests api.ContainerManifestList
resp, err = fakeClient.Get("/registry/hosts/machine/kubelet", false, false) resp, err = fakeClient.Get("/registry/hosts/machine/kubelet", false, false)
expectNoError(t, err) expectNoError(t, err)
err = json.Unmarshal([]byte(resp.Node.Value), &manifests) err = api.DecodeInto([]byte(resp.Node.Value), &manifests)
if len(manifests) != 1 || manifests[0].ID != "foo" { if len(manifests.Items) != 1 || manifests.Items[0].ID != "foo" {
t.Errorf("Unexpected manifest list: %#v", manifests) t.Errorf("Unexpected manifest list: %#v", manifests)
} }
} }
@ -213,9 +216,9 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) {
}, },
E: tools.EtcdErrorNotFound, E: tools.EtcdErrorNotFound,
} }
fakeClient.Set("/registry/hosts/machine/kubelet", util.MakeJSONString([]api.ContainerManifest{ fakeClient.Set("/registry/hosts/machine/kubelet", api.EncodeOrDie(api.ContainerManifestList{
{ Items: []api.ContainerManifest{
ID: "bar", {ID: "bar"},
}, },
}), 0) }), 0)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
@ -236,18 +239,20 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) {
}) })
expectNoError(t, err) expectNoError(t, err)
resp, err := fakeClient.Get("/registry/hosts/machine/pods/foo", false, false) resp, err := fakeClient.Get("/registry/hosts/machine/pods/foo", false, false)
expectNoError(t, err) if err != nil {
t.Fatalf("Unexpected error %v", err)
}
var pod api.Pod var pod api.Pod
err = json.Unmarshal([]byte(resp.Node.Value), &pod) err = api.DecodeInto([]byte(resp.Node.Value), &pod)
expectNoError(t, err) expectNoError(t, err)
if pod.ID != "foo" { if pod.ID != "foo" {
t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value) t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value)
} }
var manifests []api.ContainerManifest var manifests api.ContainerManifestList
resp, err = fakeClient.Get("/registry/hosts/machine/kubelet", false, false) resp, err = fakeClient.Get("/registry/hosts/machine/kubelet", false, false)
expectNoError(t, err) expectNoError(t, err)
err = json.Unmarshal([]byte(resp.Node.Value), &manifests) err = api.DecodeInto([]byte(resp.Node.Value), &manifests)
if len(manifests) != 2 || manifests[1].ID != "foo" { if len(manifests.Items) != 2 || manifests.Items[1].ID != "foo" {
t.Errorf("Unexpected manifest list: %#v", manifests) t.Errorf("Unexpected manifest list: %#v", manifests)
} }
} }
@ -256,9 +261,9 @@ func TestEtcdDeletePod(t *testing.T) {
fakeClient := tools.MakeFakeEtcdClient(t) fakeClient := tools.MakeFakeEtcdClient(t)
key := "/registry/hosts/machine/pods/foo" key := "/registry/hosts/machine/pods/foo"
fakeClient.Set(key, util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0) fakeClient.Set(key, util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0)
fakeClient.Set("/registry/hosts/machine/kubelet", util.MakeJSONString([]api.ContainerManifest{ fakeClient.Set("/registry/hosts/machine/kubelet", api.EncodeOrDie(&api.ContainerManifestList{
{ Items: []api.ContainerManifest{
ID: "foo", {ID: "foo"},
}, },
}), 0) }), 0)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
@ -269,8 +274,13 @@ func TestEtcdDeletePod(t *testing.T) {
} else if fakeClient.DeletedKeys[0] != key { } else if fakeClient.DeletedKeys[0] != key {
t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key) t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
} }
response, _ := fakeClient.Get("/registry/hosts/machine/kubelet", false, false) response, err := fakeClient.Get("/registry/hosts/machine/kubelet", false, false)
if response.Node.Value != "[]" { if err != nil {
t.Fatalf("Unexpected error %v", err)
}
var manifests api.ContainerManifestList
api.DecodeInto([]byte(response.Node.Value), &manifests)
if len(manifests.Items) != 0 {
t.Errorf("Unexpected container set: %s, expected empty", response.Node.Value) t.Errorf("Unexpected container set: %s, expected empty", response.Node.Value)
} }
} }
@ -279,9 +289,11 @@ func TestEtcdDeletePodMultipleContainers(t *testing.T) {
fakeClient := tools.MakeFakeEtcdClient(t) fakeClient := tools.MakeFakeEtcdClient(t)
key := "/registry/hosts/machine/pods/foo" key := "/registry/hosts/machine/pods/foo"
fakeClient.Set(key, util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0) fakeClient.Set(key, util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0)
fakeClient.Set("/registry/hosts/machine/kubelet", util.MakeJSONString([]api.ContainerManifest{ fakeClient.Set("/registry/hosts/machine/kubelet", api.EncodeOrDie(&api.ContainerManifestList{
{ID: "foo"}, Items: []api.ContainerManifest{
{ID: "bar"}, {ID: "foo"},
{ID: "bar"},
},
}), 0) }), 0)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
err := registry.DeletePod("foo") err := registry.DeletePod("foo")
@ -292,13 +304,16 @@ func TestEtcdDeletePodMultipleContainers(t *testing.T) {
if fakeClient.DeletedKeys[0] != key { if fakeClient.DeletedKeys[0] != key {
t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key) t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
} }
response, _ := fakeClient.Get("/registry/hosts/machine/kubelet", false, false) response, err := fakeClient.Get("/registry/hosts/machine/kubelet", false, false)
var manifests []api.ContainerManifest if err != nil {
json.Unmarshal([]byte(response.Node.Value), &manifests) t.Fatalf("Unexpected error %v", err)
if len(manifests) != 1 {
t.Errorf("Unexpected manifest set: %#v, expected empty", manifests)
} }
if manifests[0].ID != "bar" { var manifests api.ContainerManifestList
api.DecodeInto([]byte(response.Node.Value), &manifests)
if len(manifests.Items) != 1 {
t.Fatalf("Unexpected manifest set: %#v, expected empty", manifests)
}
if manifests.Items[0].ID != "bar" {
t.Errorf("Deleted wrong manifest: %#v", manifests) t.Errorf("Deleted wrong manifest: %#v", manifests)
} }
} }
@ -476,9 +491,11 @@ func TestEtcdCreateController(t *testing.T) {
}) })
expectNoError(t, err) expectNoError(t, err)
resp, err := fakeClient.Get("/registry/controllers/foo", false, false) resp, err := fakeClient.Get("/registry/controllers/foo", false, false)
expectNoError(t, err) if err != nil {
t.Fatalf("Unexpected error %v", err)
}
var ctrl api.ReplicationController var ctrl api.ReplicationController
err = json.Unmarshal([]byte(resp.Node.Value), &ctrl) err = api.DecodeInto([]byte(resp.Node.Value), &ctrl)
expectNoError(t, err) expectNoError(t, err)
if ctrl.ID != "foo" { if ctrl.ID != "foo" {
t.Errorf("Unexpected pod: %#v %s", ctrl, resp.Node.Value) t.Errorf("Unexpected pod: %#v %s", ctrl, resp.Node.Value)
@ -544,7 +561,7 @@ func TestEtcdCreateService(t *testing.T) {
resp, err := fakeClient.Get("/registry/services/specs/foo", false, false) resp, err := fakeClient.Get("/registry/services/specs/foo", false, false)
expectNoError(t, err) expectNoError(t, err)
var service api.Service var service api.Service
err = json.Unmarshal([]byte(resp.Node.Value), &service) err = api.DecodeInto([]byte(resp.Node.Value), &service)
expectNoError(t, err) expectNoError(t, err)
if service.ID != "foo" { if service.ID != "foo" {
t.Errorf("Unexpected service: %#v %s", service, resp.Node.Value) t.Errorf("Unexpected service: %#v %s", service, resp.Node.Value)
@ -621,15 +638,17 @@ func TestEtcdUpdateEndpoints(t *testing.T) {
fakeClient := tools.MakeFakeEtcdClient(t) fakeClient := tools.MakeFakeEtcdClient(t)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
endpoints := api.Endpoints{ endpoints := api.Endpoints{
Name: "foo", JSONBase: api.JSONBase{ID: "foo"},
Endpoints: []string{"baz", "bar"}, Endpoints: []string{"baz", "bar"},
} }
err := registry.UpdateEndpoints(endpoints) err := registry.UpdateEndpoints(endpoints)
expectNoError(t, err) expectNoError(t, err)
response, err := fakeClient.Get("/registry/services/endpoints/foo", false, false) response, err := fakeClient.Get("/registry/services/endpoints/foo", false, false)
expectNoError(t, err) if err != nil {
t.Fatalf("Unexpected error %v", err)
}
var endpointsOut api.Endpoints var endpointsOut api.Endpoints
err = json.Unmarshal([]byte(response.Node.Value), &endpointsOut) err = api.DecodeInto([]byte(response.Node.Value), &endpointsOut)
if !reflect.DeepEqual(endpoints, endpointsOut) { if !reflect.DeepEqual(endpoints, endpointsOut) {
t.Errorf("Unexpected endpoints: %#v, expected %#v", endpointsOut, endpoints) t.Errorf("Unexpected endpoints: %#v, expected %#v", endpointsOut, endpoints)
} }