mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-01 07:47:56 +00:00
Merge pull request #756 from smarterclayton/expose_encoding_versioning_interfaces
Expose an Encoding/Versioning interface for use with etcd
This commit is contained in:
commit
27426db827
@ -25,6 +25,20 @@ import (
|
||||
"gopkg.in/v1/yaml"
|
||||
)
|
||||
|
||||
type EncodingInterface interface {
|
||||
Encode(obj interface{}) (data []byte, err error)
|
||||
Decode(data []byte) (interface{}, error)
|
||||
DecodeInto(data []byte, obj interface{}) error
|
||||
}
|
||||
|
||||
type VersioningInterface interface {
|
||||
SetResourceVersion(obj interface{}, version uint64) error
|
||||
ResourceVersion(obj interface{}) (uint64, error)
|
||||
}
|
||||
|
||||
var Encoding EncodingInterface
|
||||
var Versioning VersioningInterface
|
||||
|
||||
var conversionScheme *conversion.Scheme
|
||||
|
||||
func init() {
|
||||
@ -86,6 +100,9 @@ func init() {
|
||||
return nil
|
||||
},
|
||||
)
|
||||
|
||||
Encoding = conversionScheme
|
||||
Versioning = JSONBaseVersioning{}
|
||||
}
|
||||
|
||||
// AddKnownTypes registers the types of the arguments to the marshaller of the package api.
|
||||
|
@ -21,6 +21,26 @@ import (
|
||||
"reflect"
|
||||
)
|
||||
|
||||
// versionedJSONBase allows access to the version state of a JSONBase object
|
||||
type JSONBaseVersioning struct{}
|
||||
|
||||
func (v JSONBaseVersioning) ResourceVersion(obj interface{}) (uint64, error) {
|
||||
json, err := FindJSONBaseRO(obj)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return json.ResourceVersion, nil
|
||||
}
|
||||
|
||||
func (v JSONBaseVersioning) SetResourceVersion(obj interface{}, version uint64) error {
|
||||
json, err := FindJSONBase(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
json.SetResourceVersion(version)
|
||||
return nil
|
||||
}
|
||||
|
||||
// JSONBase lets you work with a JSONBase from any of the versioned or
|
||||
// internal APIObjects.
|
||||
type JSONBaseInterface interface {
|
||||
|
@ -57,3 +57,35 @@ func TestGenericJSONBase(t *testing.T) {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
}
|
||||
|
||||
func TestVersioningOfAPI(t *testing.T) {
|
||||
type T struct {
|
||||
Object interface{}
|
||||
Expected uint64
|
||||
}
|
||||
testCases := map[string]T{
|
||||
"empty api object": {Service{}, 0},
|
||||
"api object with version": {Service{JSONBase: JSONBase{ResourceVersion: 1}}, 1},
|
||||
"pointer to api object with version": {&Service{JSONBase: JSONBase{ResourceVersion: 1}}, 1},
|
||||
}
|
||||
versioning := JSONBaseVersioning{}
|
||||
for key, testCase := range testCases {
|
||||
actual, err := versioning.ResourceVersion(testCase.Object)
|
||||
if err != nil {
|
||||
t.Errorf("%s: unexpected error %#v", key, err)
|
||||
}
|
||||
if actual != testCase.Expected {
|
||||
t.Errorf("%s: expected %d, got %d", key, testCase.Expected, actual)
|
||||
}
|
||||
}
|
||||
|
||||
failingCases := map[string]T{
|
||||
"not a valid object to try": {JSONBase{ResourceVersion: 1}, 1},
|
||||
}
|
||||
for key, testCase := range failingCases {
|
||||
_, err := versioning.ResourceVersion(testCase.Object)
|
||||
if err == nil {
|
||||
t.Errorf("%s: expected error, got nil", key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -34,7 +34,7 @@ import (
|
||||
// TODO: Allow choice of switching between etcd/apiserver watching, or remove etcd references
|
||||
// from this file completely.
|
||||
type ReplicationManager struct {
|
||||
etcdClient tools.EtcdClient
|
||||
etcdHelper tools.EtcdHelper
|
||||
kubeClient client.Interface
|
||||
podControl PodControlInterface
|
||||
syncTime <-chan time.Time
|
||||
@ -84,7 +84,7 @@ func (r RealPodControl) deletePod(podID string) error {
|
||||
func MakeReplicationManager(etcdClient tools.EtcdClient, kubeClient client.Interface) *ReplicationManager {
|
||||
rm := &ReplicationManager{
|
||||
kubeClient: kubeClient,
|
||||
etcdClient: etcdClient,
|
||||
etcdHelper: tools.EtcdHelper{etcdClient, api.Encoding, api.Versioning},
|
||||
podControl: RealPodControl{
|
||||
kubeClient: kubeClient,
|
||||
},
|
||||
@ -102,8 +102,7 @@ func (rm *ReplicationManager) Run(period time.Duration) {
|
||||
|
||||
// makeEtcdWatch starts watching via etcd.
|
||||
func (rm *ReplicationManager) makeEtcdWatch() (watch.Interface, error) {
|
||||
helper := tools.EtcdHelper{rm.etcdClient}
|
||||
return helper.WatchList("/registry/controllers", tools.Everything)
|
||||
return rm.etcdHelper.WatchList("/registry/controllers", tools.Everything)
|
||||
}
|
||||
|
||||
// makeAPIWatch starts watching via the apiserver.
|
||||
@ -192,8 +191,7 @@ func (rm *ReplicationManager) syncReplicationController(controllerSpec api.Repli
|
||||
|
||||
func (rm *ReplicationManager) synchronize() {
|
||||
var controllerSpecs []api.ReplicationController
|
||||
helper := tools.EtcdHelper{rm.etcdClient}
|
||||
err := helper.ExtractList("/registry/controllers", &controllerSpecs)
|
||||
err := rm.etcdHelper.ExtractList("/registry/controllers", &controllerSpecs)
|
||||
if err != nil {
|
||||
glog.Errorf("Synchronization error: %v (%#v)", err, err)
|
||||
return
|
||||
|
@ -32,7 +32,8 @@ import (
|
||||
|
||||
// EtcdRegistry implements PodRegistry, ControllerRegistry and ServiceRegistry with backed by etcd.
|
||||
type EtcdRegistry struct {
|
||||
etcdClient tools.EtcdClient
|
||||
client tools.EtcdClient
|
||||
helper tools.EtcdHelper
|
||||
machines MinionRegistry
|
||||
manifestFactory ManifestFactory
|
||||
}
|
||||
@ -43,8 +44,9 @@ type EtcdRegistry struct {
|
||||
// 'scheduler' is the scheduling algorithm to use.
|
||||
func MakeEtcdRegistry(client tools.EtcdClient, machines MinionRegistry) *EtcdRegistry {
|
||||
registry := &EtcdRegistry{
|
||||
etcdClient: client,
|
||||
machines: machines,
|
||||
client: client,
|
||||
helper: tools.EtcdHelper{client, api.Encoding, api.Versioning},
|
||||
machines: machines,
|
||||
}
|
||||
registry.manifestFactory = &BasicManifestFactory{
|
||||
serviceRegistry: registry,
|
||||
@ -56,10 +58,6 @@ func makePodKey(machine, podID string) string {
|
||||
return "/registry/hosts/" + machine + "/pods/" + podID
|
||||
}
|
||||
|
||||
func (registry *EtcdRegistry) helper() *tools.EtcdHelper {
|
||||
return &tools.EtcdHelper{registry.etcdClient}
|
||||
}
|
||||
|
||||
// ListPods obtains a list of pods that match selector.
|
||||
func (registry *EtcdRegistry) ListPods(selector labels.Selector) ([]api.Pod, error) {
|
||||
pods := []api.Pod{}
|
||||
@ -69,7 +67,7 @@ func (registry *EtcdRegistry) ListPods(selector labels.Selector) ([]api.Pod, err
|
||||
}
|
||||
for _, machine := range machines {
|
||||
var machinePods []api.Pod
|
||||
err := registry.helper().ExtractList("/registry/hosts/"+machine+"/pods", &machinePods)
|
||||
err := registry.helper.ExtractList("/registry/hosts/"+machine+"/pods", &machinePods)
|
||||
if err != nil {
|
||||
return pods, err
|
||||
}
|
||||
@ -105,7 +103,7 @@ func (registry *EtcdRegistry) CreatePod(machineIn string, pod api.Pod) error {
|
||||
|
||||
func (registry *EtcdRegistry) runPod(pod api.Pod, machine string) error {
|
||||
podKey := makePodKey(machine, pod.ID)
|
||||
err := registry.helper().SetObj(podKey, pod)
|
||||
err := registry.helper.SetObj(podKey, pod)
|
||||
|
||||
manifest, err := registry.manifestFactory.MakeManifest(machine, pod)
|
||||
if err != nil {
|
||||
@ -113,14 +111,14 @@ func (registry *EtcdRegistry) runPod(pod api.Pod, machine string) error {
|
||||
}
|
||||
|
||||
contKey := makeContainerKey(machine)
|
||||
err = registry.helper().AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) {
|
||||
err = registry.helper.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) {
|
||||
manifests := *in.(*api.ContainerManifestList)
|
||||
manifests.Items = append(manifests.Items, manifest)
|
||||
return manifests, nil
|
||||
})
|
||||
if err != nil {
|
||||
// Don't strand stuff.
|
||||
_, err2 := registry.etcdClient.Delete(podKey, false)
|
||||
_, err2 := registry.client.Delete(podKey, false)
|
||||
if err2 != nil {
|
||||
glog.Errorf("Probably stranding a pod, couldn't delete %v: %#v", podKey, err2)
|
||||
}
|
||||
@ -145,7 +143,7 @@ func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error
|
||||
// First delete the pod, so a scheduler doesn't notice it getting removed from the
|
||||
// machine and attempt to put it somewhere.
|
||||
podKey := makePodKey(machine, podID)
|
||||
_, err := registry.etcdClient.Delete(podKey, true)
|
||||
_, err := registry.client.Delete(podKey, true)
|
||||
if tools.IsEtcdNotFound(err) {
|
||||
return apiserver.NewNotFoundErr("pod", podID)
|
||||
}
|
||||
@ -155,7 +153,7 @@ func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error
|
||||
|
||||
// Next, remove the pod from the machine atomically.
|
||||
contKey := makeContainerKey(machine)
|
||||
return registry.helper().AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) {
|
||||
return registry.helper.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) {
|
||||
manifests := in.(*api.ContainerManifestList)
|
||||
newManifests := make([]api.ContainerManifest, 0, len(manifests.Items))
|
||||
found := false
|
||||
@ -179,7 +177,7 @@ func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error
|
||||
|
||||
func (registry *EtcdRegistry) getPodForMachine(machine, podID string) (pod api.Pod, err error) {
|
||||
key := makePodKey(machine, podID)
|
||||
err = registry.helper().ExtractObj(key, &pod, false)
|
||||
err = registry.helper.ExtractObj(key, &pod, false)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@ -204,14 +202,14 @@ func (registry *EtcdRegistry) findPod(podID string) (api.Pod, string, error) {
|
||||
// ListControllers obtains a list of ReplicationControllers.
|
||||
func (registry *EtcdRegistry) ListControllers() ([]api.ReplicationController, error) {
|
||||
var controllers []api.ReplicationController
|
||||
err := registry.helper().ExtractList("/registry/controllers", &controllers)
|
||||
err := registry.helper.ExtractList("/registry/controllers", &controllers)
|
||||
return controllers, err
|
||||
}
|
||||
|
||||
// WatchControllers begins watching for new, changed, or deleted controllers.
|
||||
// TODO: Add id/selector parameters?
|
||||
func (registry *EtcdRegistry) WatchControllers() (watch.Interface, error) {
|
||||
return registry.helper().WatchList("/registry/controllers", tools.Everything)
|
||||
return registry.helper.WatchList("/registry/controllers", tools.Everything)
|
||||
}
|
||||
|
||||
func makeControllerKey(id string) string {
|
||||
@ -222,7 +220,7 @@ func makeControllerKey(id string) string {
|
||||
func (registry *EtcdRegistry) GetController(controllerID string) (*api.ReplicationController, error) {
|
||||
var controller api.ReplicationController
|
||||
key := makeControllerKey(controllerID)
|
||||
err := registry.helper().ExtractObj(key, &controller, false)
|
||||
err := registry.helper.ExtractObj(key, &controller, false)
|
||||
if tools.IsEtcdNotFound(err) {
|
||||
return nil, apiserver.NewNotFoundErr("replicationController", controllerID)
|
||||
}
|
||||
@ -240,13 +238,13 @@ func (registry *EtcdRegistry) CreateController(controller api.ReplicationControl
|
||||
|
||||
// UpdateController replaces an existing ReplicationController.
|
||||
func (registry *EtcdRegistry) UpdateController(controller api.ReplicationController) error {
|
||||
return registry.helper().SetObj(makeControllerKey(controller.ID), controller)
|
||||
return registry.helper.SetObj(makeControllerKey(controller.ID), controller)
|
||||
}
|
||||
|
||||
// DeleteController deletes a ReplicationController specified by its ID.
|
||||
func (registry *EtcdRegistry) DeleteController(controllerID string) error {
|
||||
key := makeControllerKey(controllerID)
|
||||
_, err := registry.etcdClient.Delete(key, false)
|
||||
_, err := registry.client.Delete(key, false)
|
||||
if tools.IsEtcdNotFound(err) {
|
||||
return apiserver.NewNotFoundErr("replicationController", controllerID)
|
||||
}
|
||||
@ -260,20 +258,20 @@ func makeServiceKey(name string) string {
|
||||
// ListServices obtains a list of Services.
|
||||
func (registry *EtcdRegistry) ListServices() (api.ServiceList, error) {
|
||||
var list api.ServiceList
|
||||
err := registry.helper().ExtractList("/registry/services/specs", &list.Items)
|
||||
err := registry.helper.ExtractList("/registry/services/specs", &list.Items)
|
||||
return list, err
|
||||
}
|
||||
|
||||
// CreateService creates a new Service.
|
||||
func (registry *EtcdRegistry) CreateService(svc api.Service) error {
|
||||
return registry.helper().SetObj(makeServiceKey(svc.ID), svc)
|
||||
return registry.helper.SetObj(makeServiceKey(svc.ID), svc)
|
||||
}
|
||||
|
||||
// GetService obtains a Service specified by its name.
|
||||
func (registry *EtcdRegistry) GetService(name string) (*api.Service, error) {
|
||||
key := makeServiceKey(name)
|
||||
var svc api.Service
|
||||
err := registry.helper().ExtractObj(key, &svc, false)
|
||||
err := registry.helper.ExtractObj(key, &svc, false)
|
||||
if tools.IsEtcdNotFound(err) {
|
||||
return nil, apiserver.NewNotFoundErr("service", name)
|
||||
}
|
||||
@ -290,7 +288,7 @@ func makeServiceEndpointsKey(name string) string {
|
||||
// DeleteService deletes a Service specified by its name.
|
||||
func (registry *EtcdRegistry) DeleteService(name string) error {
|
||||
key := makeServiceKey(name)
|
||||
_, err := registry.etcdClient.Delete(key, true)
|
||||
_, err := registry.client.Delete(key, true)
|
||||
if tools.IsEtcdNotFound(err) {
|
||||
return apiserver.NewNotFoundErr("service", name)
|
||||
}
|
||||
@ -298,7 +296,7 @@ func (registry *EtcdRegistry) DeleteService(name string) error {
|
||||
return err
|
||||
}
|
||||
key = makeServiceEndpointsKey(name)
|
||||
_, err = registry.etcdClient.Delete(key, true)
|
||||
_, err = registry.client.Delete(key, true)
|
||||
if !tools.IsEtcdNotFound(err) {
|
||||
return err
|
||||
}
|
||||
@ -313,5 +311,5 @@ func (registry *EtcdRegistry) UpdateService(svc api.Service) error {
|
||||
|
||||
// UpdateEndpoints update Endpoints of a Service.
|
||||
func (registry *EtcdRegistry) UpdateEndpoints(e api.Endpoints) error {
|
||||
return registry.helper().SetObj("/registry/services/endpoints/"+e.ID, e)
|
||||
return registry.helper.SetObj("/registry/services/endpoints/"+e.ID, e)
|
||||
}
|
||||
|
@ -21,7 +21,6 @@ import (
|
||||
"reflect"
|
||||
"sync"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
@ -42,6 +41,17 @@ var (
|
||||
EtcdErrorValueRequired = &etcd.EtcdError{ErrorCode: EtcdErrorCodeValueRequired}
|
||||
)
|
||||
|
||||
type Encoding interface {
|
||||
Encode(obj interface{}) (data []byte, err error)
|
||||
Decode(data []byte) (interface{}, error)
|
||||
DecodeInto(data []byte, obj interface{}) error
|
||||
}
|
||||
|
||||
type Versioning interface {
|
||||
SetResourceVersion(obj interface{}, version uint64) error
|
||||
ResourceVersion(obj interface{}) (uint64, error)
|
||||
}
|
||||
|
||||
// EtcdClient is an injectable interface for testing.
|
||||
type EtcdClient interface {
|
||||
AddChild(key, data string, ttl uint64) (*etcd.Response, error)
|
||||
@ -65,7 +75,10 @@ type EtcdGetSet interface {
|
||||
|
||||
// EtcdHelper offers common object marshalling/unmarshalling operations on an etcd client.
|
||||
type EtcdHelper struct {
|
||||
Client EtcdGetSet
|
||||
Client EtcdGetSet
|
||||
Encoding Encoding
|
||||
// optional
|
||||
Versioning Versioning
|
||||
}
|
||||
|
||||
// Returns true iff err is an etcd not found error.
|
||||
@ -116,7 +129,7 @@ func (h *EtcdHelper) ExtractList(key string, slicePtr interface{}) error {
|
||||
v := pv.Elem()
|
||||
for _, node := range nodes {
|
||||
obj := reflect.New(v.Type().Elem())
|
||||
err = api.DecodeInto([]byte(node.Value), obj.Interface())
|
||||
err = h.Encoding.DecodeInto([]byte(node.Value), obj.Interface())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -150,12 +163,10 @@ func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr interface{}, ignoreNot
|
||||
return "", 0, fmt.Errorf("key '%v' found no nodes field: %#v", key, response)
|
||||
}
|
||||
body = response.Node.Value
|
||||
err = api.DecodeInto([]byte(body), objPtr)
|
||||
if jsonBase, err := api.FindJSONBase(objPtr); err == nil {
|
||||
jsonBase.SetResourceVersion(response.Node.ModifiedIndex)
|
||||
// Note that err shadows the err returned below, so we won't
|
||||
// return an error just because we failed to find a JSONBase.
|
||||
// This is intentional.
|
||||
err = h.Encoding.DecodeInto([]byte(body), objPtr)
|
||||
if h.Versioning != nil {
|
||||
_ = h.Versioning.SetResourceVersion(objPtr, response.Node.ModifiedIndex)
|
||||
// being unable to set the version does not prevent the object from being extracted
|
||||
}
|
||||
return body, response.Node.ModifiedIndex, err
|
||||
}
|
||||
@ -163,13 +174,15 @@ func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr interface{}, ignoreNot
|
||||
// SetObj marshals obj via json, and stores under key. Will do an
|
||||
// atomic update if obj's ResourceVersion field is set.
|
||||
func (h *EtcdHelper) SetObj(key string, obj interface{}) error {
|
||||
data, err := api.Encode(obj)
|
||||
data, err := h.Encoding.Encode(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if jsonBase, err := api.FindJSONBaseRO(obj); err == nil && jsonBase.ResourceVersion != 0 {
|
||||
_, err = h.Client.CompareAndSwap(key, string(data), 0, "", jsonBase.ResourceVersion)
|
||||
return err // err is shadowed!
|
||||
if h.Versioning != nil {
|
||||
if version, err := h.Versioning.ResourceVersion(obj); err == nil && version != 0 {
|
||||
_, err = h.Client.CompareAndSwap(key, string(data), 0, "", version)
|
||||
return err // err is shadowed!
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: when client supports atomic creation, integrate this with the above.
|
||||
@ -186,7 +199,7 @@ type EtcdUpdateFunc func(input interface{}) (output interface{}, err error)
|
||||
//
|
||||
// Example:
|
||||
//
|
||||
// h := &util.EtcdHelper{client}
|
||||
// h := &util.EtcdHelper{client, encoding, versioning}
|
||||
// err := h.AtomicUpdate("myKey", &MyType{}, func(input interface{}) (interface{}, error) {
|
||||
// // Before this function is called, currentObj has been reset to etcd's current
|
||||
// // contents for "myKey".
|
||||
@ -225,7 +238,7 @@ func (h *EtcdHelper) AtomicUpdate(key string, ptrToType interface{}, tryUpdate E
|
||||
return h.SetObj(key, ret)
|
||||
}
|
||||
|
||||
data, err := api.Encode(ret)
|
||||
data, err := h.Encoding.Encode(ret)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -250,7 +263,7 @@ func Everything(interface{}) bool {
|
||||
// API objects, and any items passing 'filter' are sent down the returned
|
||||
// watch.Interface.
|
||||
func (h *EtcdHelper) WatchList(key string, filter FilterFunc) (watch.Interface, error) {
|
||||
w := newEtcdWatcher(true, filter)
|
||||
w := newEtcdWatcher(true, filter, h.Encoding)
|
||||
go w.etcdWatch(h.Client, key)
|
||||
return w, nil
|
||||
}
|
||||
@ -258,13 +271,15 @@ func (h *EtcdHelper) WatchList(key string, filter FilterFunc) (watch.Interface,
|
||||
// Watch begins watching the specified key. Events are decoded into
|
||||
// API objects and sent down the returned watch.Interface.
|
||||
func (h *EtcdHelper) Watch(key string) (watch.Interface, error) {
|
||||
w := newEtcdWatcher(false, nil)
|
||||
w := newEtcdWatcher(false, nil, h.Encoding)
|
||||
go w.etcdWatch(h.Client, key)
|
||||
return w, nil
|
||||
}
|
||||
|
||||
// etcdWatcher converts a native etcd watch to a watch.Interface.
|
||||
type etcdWatcher struct {
|
||||
encoding Encoding
|
||||
|
||||
list bool // If we're doing a recursive watch, should be true.
|
||||
filter FilterFunc
|
||||
|
||||
@ -282,8 +297,9 @@ type etcdWatcher struct {
|
||||
}
|
||||
|
||||
// Returns a new etcdWatcher; if list is true, watch sub-nodes.
|
||||
func newEtcdWatcher(list bool, filter FilterFunc) *etcdWatcher {
|
||||
func newEtcdWatcher(list bool, filter FilterFunc, encoding Encoding) *etcdWatcher {
|
||||
w := &etcdWatcher{
|
||||
encoding: encoding,
|
||||
list: list,
|
||||
filter: filter,
|
||||
etcdIncoming: make(chan *etcd.Response),
|
||||
@ -358,7 +374,7 @@ func (w *etcdWatcher) sendResult(res *etcd.Response) {
|
||||
return
|
||||
}
|
||||
|
||||
obj, err := api.Decode(data)
|
||||
obj, err := w.encoding.Decode(data)
|
||||
if err != nil {
|
||||
glog.Errorf("failure to decode api object: '%v' from %#v", string(data), res)
|
||||
// TODO: expose an error through watch.Interface?
|
||||
|
@ -23,12 +23,13 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/conversion"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
)
|
||||
|
||||
type fakeEtcdGetSet struct {
|
||||
type fakeClientGetSet struct {
|
||||
get func(key string, sort, recursive bool) (*etcd.Response, error)
|
||||
set func(key, value string, ttl uint64) (*etcd.Response, error)
|
||||
}
|
||||
@ -38,9 +39,15 @@ type TestResource struct {
|
||||
Value int `json:"value" yaml:"value,omitempty"`
|
||||
}
|
||||
|
||||
var scheme *conversion.Scheme
|
||||
var encoding = api.Encoding
|
||||
var versioning = api.Versioning
|
||||
|
||||
func init() {
|
||||
api.AddKnownTypes("", TestResource{})
|
||||
api.AddKnownTypes("v1beta1", TestResource{})
|
||||
scheme = conversion.NewScheme()
|
||||
scheme.ExternalVersion = "v1beta1"
|
||||
scheme.AddKnownTypes("", TestResource{})
|
||||
scheme.AddKnownTypes("v1beta1", TestResource{})
|
||||
}
|
||||
|
||||
func TestIsNotFoundErr(t *testing.T) {
|
||||
@ -80,7 +87,7 @@ func TestExtractList(t *testing.T) {
|
||||
{JSONBase: api.JSONBase{ID: "baz"}},
|
||||
}
|
||||
var got []api.Pod
|
||||
helper := EtcdHelper{fakeClient}
|
||||
helper := EtcdHelper{fakeClient, encoding, versioning}
|
||||
err := helper.ExtractList("/some/key", &got)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %#v", err)
|
||||
@ -94,7 +101,7 @@ func TestExtractObj(t *testing.T) {
|
||||
fakeClient := MakeFakeEtcdClient(t)
|
||||
expect := api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
|
||||
fakeClient.Set("/some/key", util.MakeJSONString(expect), 0)
|
||||
helper := EtcdHelper{fakeClient}
|
||||
helper := EtcdHelper{fakeClient, encoding, versioning}
|
||||
var got api.Pod
|
||||
err := helper.ExtractObj("/some/key", &got, false)
|
||||
if err != nil {
|
||||
@ -127,7 +134,7 @@ func TestExtractObjNotFoundErr(t *testing.T) {
|
||||
},
|
||||
},
|
||||
}
|
||||
helper := EtcdHelper{fakeClient}
|
||||
helper := EtcdHelper{fakeClient, encoding, versioning}
|
||||
try := func(key string) {
|
||||
var got api.Pod
|
||||
err := helper.ExtractObj(key, &got, false)
|
||||
@ -148,12 +155,60 @@ func TestExtractObjNotFoundErr(t *testing.T) {
|
||||
func TestSetObj(t *testing.T) {
|
||||
obj := api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
|
||||
fakeClient := MakeFakeEtcdClient(t)
|
||||
helper := EtcdHelper{fakeClient}
|
||||
helper := EtcdHelper{fakeClient, encoding, versioning}
|
||||
err := helper.SetObj("/some/key", obj)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %#v", err)
|
||||
}
|
||||
data, err := api.Encode(obj)
|
||||
data, err := encoding.Encode(obj)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %#v", err)
|
||||
}
|
||||
expect := string(data)
|
||||
got := fakeClient.Data["/some/key"].R.Node.Value
|
||||
if expect != got {
|
||||
t.Errorf("Wanted %v, got %v", expect, got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetObjWithVersion(t *testing.T) {
|
||||
obj := api.Pod{JSONBase: api.JSONBase{ID: "foo", ResourceVersion: 1}}
|
||||
fakeClient := MakeFakeEtcdClient(t)
|
||||
fakeClient.TestIndex = true
|
||||
fakeClient.Data["/some/key"] = EtcdResponseWithError{
|
||||
R: &etcd.Response{
|
||||
Node: &etcd.Node{
|
||||
Value: api.EncodeOrDie(obj),
|
||||
ModifiedIndex: 1,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
helper := EtcdHelper{fakeClient, encoding, versioning}
|
||||
err := helper.SetObj("/some/key", obj)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error %#v", err)
|
||||
}
|
||||
data, err := encoding.Encode(obj)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error %#v", err)
|
||||
}
|
||||
expect := string(data)
|
||||
got := fakeClient.Data["/some/key"].R.Node.Value
|
||||
if expect != got {
|
||||
t.Errorf("Wanted %v, got %v", expect, got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetObjWithoutVersioning(t *testing.T) {
|
||||
obj := api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
|
||||
fakeClient := MakeFakeEtcdClient(t)
|
||||
helper := EtcdHelper{fakeClient, encoding, nil}
|
||||
err := helper.SetObj("/some/key", obj)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %#v", err)
|
||||
}
|
||||
data, err := encoding.Encode(obj)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %#v", err)
|
||||
}
|
||||
@ -167,7 +222,8 @@ func TestSetObj(t *testing.T) {
|
||||
func TestAtomicUpdate(t *testing.T) {
|
||||
fakeClient := MakeFakeEtcdClient(t)
|
||||
fakeClient.TestIndex = true
|
||||
helper := EtcdHelper{fakeClient}
|
||||
encoding := scheme
|
||||
helper := EtcdHelper{fakeClient, encoding, api.JSONBaseVersioning{}}
|
||||
|
||||
// Create a new node.
|
||||
fakeClient.ExpectNotFoundGet("/some/key")
|
||||
@ -178,7 +234,7 @@ func TestAtomicUpdate(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %#v", err)
|
||||
}
|
||||
data, err := api.Encode(obj)
|
||||
data, err := encoding.Encode(obj)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %#v", err)
|
||||
}
|
||||
@ -204,7 +260,7 @@ func TestAtomicUpdate(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %#v", err)
|
||||
}
|
||||
data, err = api.Encode(objUpdate)
|
||||
data, err = encoding.Encode(objUpdate)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %#v", err)
|
||||
}
|
||||
@ -223,9 +279,9 @@ func TestWatchInterpretation_ListAdd(t *testing.T) {
|
||||
w := newEtcdWatcher(true, func(interface{}) bool {
|
||||
t.Errorf("unexpected filter call")
|
||||
return true
|
||||
})
|
||||
}, encoding)
|
||||
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
|
||||
podBytes, _ := api.Encode(pod)
|
||||
podBytes, _ := encoding.Encode(pod)
|
||||
|
||||
go w.sendResult(&etcd.Response{
|
||||
Action: "set",
|
||||
@ -247,9 +303,9 @@ func TestWatchInterpretation_Delete(t *testing.T) {
|
||||
w := newEtcdWatcher(true, func(interface{}) bool {
|
||||
t.Errorf("unexpected filter call")
|
||||
return true
|
||||
})
|
||||
}, encoding)
|
||||
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
|
||||
podBytes, _ := api.Encode(pod)
|
||||
podBytes, _ := encoding.Encode(pod)
|
||||
|
||||
go w.sendResult(&etcd.Response{
|
||||
Action: "delete",
|
||||
@ -271,7 +327,7 @@ func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
|
||||
w := newEtcdWatcher(false, func(interface{}) bool {
|
||||
t.Errorf("unexpected filter call")
|
||||
return true
|
||||
})
|
||||
}, encoding)
|
||||
w.emit = func(e watch.Event) {
|
||||
t.Errorf("Unexpected emit: %v", e)
|
||||
}
|
||||
@ -285,7 +341,7 @@ func TestWatchInterpretation_ResponseNoNode(t *testing.T) {
|
||||
w := newEtcdWatcher(false, func(interface{}) bool {
|
||||
t.Errorf("unexpected filter call")
|
||||
return true
|
||||
})
|
||||
}, encoding)
|
||||
w.emit = func(e watch.Event) {
|
||||
t.Errorf("Unexpected emit: %v", e)
|
||||
}
|
||||
@ -298,7 +354,7 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) {
|
||||
w := newEtcdWatcher(false, func(interface{}) bool {
|
||||
t.Errorf("unexpected filter call")
|
||||
return true
|
||||
})
|
||||
}, encoding)
|
||||
w.emit = func(e watch.Event) {
|
||||
t.Errorf("Unexpected emit: %v", e)
|
||||
}
|
||||
@ -311,20 +367,20 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestWatch(t *testing.T) {
|
||||
fakeEtcd := MakeFakeEtcdClient(t)
|
||||
h := EtcdHelper{fakeEtcd}
|
||||
fakeClient := MakeFakeEtcdClient(t)
|
||||
h := EtcdHelper{fakeClient, encoding, versioning}
|
||||
|
||||
watching, err := h.Watch("/some/key")
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
fakeEtcd.WaitForWatchCompletion()
|
||||
fakeClient.WaitForWatchCompletion()
|
||||
|
||||
// Test normal case
|
||||
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
|
||||
podBytes, _ := api.Encode(pod)
|
||||
fakeEtcd.WatchResponse <- &etcd.Response{
|
||||
podBytes, _ := encoding.Encode(pod)
|
||||
fakeClient.WatchResponse <- &etcd.Response{
|
||||
Action: "set",
|
||||
Node: &etcd.Node{
|
||||
Value: string(podBytes),
|
||||
@ -344,10 +400,10 @@ func TestWatch(t *testing.T) {
|
||||
}
|
||||
|
||||
// Test error case
|
||||
fakeEtcd.WatchInjectError <- fmt.Errorf("Injected error")
|
||||
fakeClient.WatchInjectError <- fmt.Errorf("Injected error")
|
||||
|
||||
// Did everything shut down?
|
||||
if _, open := <-fakeEtcd.WatchResponse; open {
|
||||
if _, open := <-fakeClient.WatchResponse; open {
|
||||
t.Errorf("An injected error did not cause a graceful shutdown")
|
||||
}
|
||||
if _, open := <-watching.ResultChan(); open {
|
||||
@ -356,19 +412,19 @@ func TestWatch(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestWatchPurposefulShutdown(t *testing.T) {
|
||||
fakeEtcd := MakeFakeEtcdClient(t)
|
||||
h := EtcdHelper{fakeEtcd}
|
||||
fakeClient := MakeFakeEtcdClient(t)
|
||||
h := EtcdHelper{fakeClient, encoding, versioning}
|
||||
|
||||
// Test purposeful shutdown
|
||||
watching, err := h.Watch("/some/key")
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
fakeEtcd.WaitForWatchCompletion()
|
||||
fakeClient.WaitForWatchCompletion()
|
||||
watching.Stop()
|
||||
|
||||
// Did everything shut down?
|
||||
if _, open := <-fakeEtcd.WatchResponse; open {
|
||||
if _, open := <-fakeClient.WatchResponse; open {
|
||||
t.Errorf("A stop did not cause a graceful shutdown")
|
||||
}
|
||||
if _, open := <-watching.ResultChan(); open {
|
||||
|
Loading…
Reference in New Issue
Block a user