Merge pull request #307 from lavalamp/atomic

All PUTs now atomic
This commit is contained in:
brendandburns 2014-07-02 16:31:35 -07:00
commit 0b9f36b761
18 changed files with 333 additions and 101 deletions

View File

@ -23,7 +23,9 @@ import (
"io/ioutil"
"net/http"
"net/http/httptest"
"reflect"
"runtime"
"sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -128,8 +130,73 @@ func runReplicationControllerTest(kubeClient *client.Client) {
glog.Infof("Replication controller produced:\n\n%#v\n\n", pods)
}
func runAtomicPutTest(c *client.Client) {
var svc api.Service
err := c.Post().Path("services").Body(
api.Service{
JSONBase: api.JSONBase{ID: "atomicService"},
Port: 12345,
Labels: map[string]string{
"name": "atomicService",
},
},
).Do().Into(&svc)
if err != nil {
glog.Fatalf("Failed creating atomicService: %v", err)
}
testLabels := labels.Set{}
for i := 0; i < 26; i++ {
// a: z, b: y, etc...
testLabels[string([]byte{byte('a' + i)})] = string([]byte{byte('z' - i)})
}
var wg sync.WaitGroup
wg.Add(len(testLabels))
for label, value := range testLabels {
go func(l, v string) {
for {
var tmpSvc api.Service
err := c.Get().Path("services").Path(svc.ID).Do().Into(&tmpSvc)
if err != nil {
glog.Errorf("Error getting atomicService: %v", err)
continue
}
if tmpSvc.Selector == nil {
tmpSvc.Selector = map[string]string{l: v}
} else {
tmpSvc.Selector[l] = v
}
err = c.Put().Path("services").Path(svc.ID).Body(&tmpSvc).Do().Error()
if err != nil {
if se, ok := err.(*client.StatusErr); ok {
if se.Status.Code == http.StatusConflict {
// This is what we expect.
continue
}
}
glog.Errorf("Unexpected error putting atomicService: %v", err)
continue
}
break
}
wg.Done()
}(label, value)
}
wg.Wait()
err = c.Get().Path("services").Path(svc.ID).Do().Into(&svc)
if err != nil {
glog.Fatalf("Failed getting atomicService after writers are complete: %v", err)
}
if !reflect.DeepEqual(testLabels, labels.Set(svc.Selector)) {
glog.Fatalf("Selector PUTs were not atomic: wanted %v, got %v", testLabels, svc.Selector)
}
glog.Info("Atomic PUTs work.")
}
type testFunc func(*client.Client)
func main() {
runtime.GOMAXPROCS(4)
runtime.GOMAXPROCS(runtime.NumCPU())
util.ReallyCrash = true
util.InitLogs()
defer util.FlushLogs()
@ -150,7 +217,22 @@ func main() {
time.Sleep(time.Second * 10)
kubeClient := client.New(apiServerURL, nil)
runReplicationControllerTest(kubeClient)
// Run tests in parallel
testFuncs := []testFunc{
runReplicationControllerTest,
runAtomicPutTest,
}
var wg sync.WaitGroup
wg.Add(len(testFuncs))
for i := range testFuncs {
f := testFuncs[i]
go func() {
f(kubeClient)
wg.Done()
}()
}
wg.Wait()
// Check that kubelet tried to make the pods.
// Using a set to list unique creation attempts. Our fake is

View File

@ -49,6 +49,30 @@ func AddKnownTypes(types ...interface{}) {
}
}
// Takes an arbitary api type, returns pointer to its JSONBase field.
// obj must be a pointer to an api type.
func FindJSONBase(obj interface{}) (*JSONBase, error) {
_, jsonBase, err := nameAndJSONBase(obj)
return jsonBase, err
}
// Takes an arbitary api type, return a copy of its JSONBase field.
// obj may be a pointer to an api type, or a non-pointer struct api type.
func FindJSONBaseRO(obj interface{}) (JSONBase, error) {
v := reflect.ValueOf(obj)
if v.Kind() == reflect.Ptr {
v = v.Elem()
}
if v.Kind() != reflect.Struct {
return JSONBase{}, fmt.Errorf("expected struct, but got %v", v.Type().Name())
}
jsonBase := v.FieldByName("JSONBase")
if !jsonBase.IsValid() {
return JSONBase{}, fmt.Errorf("struct %v lacks embedded JSON type", v.Type().Name())
}
return jsonBase.Interface().(JSONBase), nil
}
// Encode turns the given api object into an appropriate JSON string.
// Will return an error if the object doesn't have an embedded JSONBase.
// Obj may be a pointer to a struct, or a struct. If a struct, a copy

View File

@ -150,6 +150,7 @@ type JSONBase struct {
ID string `json:"id,omitempty" yaml:"id,omitempty"`
CreationTimestamp string `json:"creationTimestamp,omitempty" yaml:"creationTimestamp,omitempty"`
SelfLink string `json:"selfLink,omitempty" yaml:"selfLink,omitempty"`
ResourceVersion uint64 `json:"resourceVersion,omitempty" yaml:"resourceVersion,omitempty"`
}
type PodStatus string
@ -266,8 +267,12 @@ type Status struct {
// One of: "success", "failure", "working" (for operations not yet completed)
// TODO: if "working", include an operation identifier so final status can be
// checked.
Status string `json:"status,omitempty" yaml:"status,omitempty"`
Status string `json:"status,omitempty" yaml:"status,omitempty"`
// Details about the status. May be an error description or an
// operation number for later polling.
Details string `json:"details,omitempty" yaml:"details,omitempty"`
// Suggested HTTP return code for this status, 0 if not set.
Code int `json:"code,omitempty" yaml:"code,omitempty"`
}
// Values of Status.Status

View File

@ -27,6 +27,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
)
@ -54,9 +55,15 @@ func MakeAsync(fn WorkFunc) <-chan interface{} {
defer util.HandleCrash()
obj, err := fn()
if err != nil {
status := http.StatusInternalServerError
switch {
case tools.IsEtcdConflict(err):
status = http.StatusConflict
}
channel <- &api.Status{
Status: api.StatusFailure,
Details: err.Error(),
Code: status,
}
} else {
channel <- obj
@ -110,9 +117,13 @@ func (server *ApiServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
glog.Infof("ApiServer panic'd on %v %v: %#v\n%s\n", req.Method, req.RequestURI, x, debug.Stack())
}
}()
logger := MakeLogged(req, w)
w = logger
defer logger.Log()
defer MakeLogged(req, &w).StacktraceWhen(
StatusIsNot(
http.StatusOK,
http.StatusAccepted,
http.StatusConflict,
),
).Log()
url, err := url.ParseRequestURI(req.RequestURI)
if err != nil {
server.error(err, w)
@ -141,7 +152,7 @@ func (server *ApiServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
storage := server.storage[requestParts[0]]
if storage == nil {
logger.Addf("'%v' has no storage object", requestParts[0])
LogOf(w).Addf("'%v' has no storage object", requestParts[0])
server.notFound(req, w)
return
} else {
@ -171,8 +182,7 @@ func (server *ApiServer) error(err error, w http.ResponseWriter) {
func (server *ApiServer) readBody(req *http.Request) ([]byte, error) {
defer req.Body.Close()
body, err := ioutil.ReadAll(req.Body)
return body, err
return ioutil.ReadAll(req.Body)
}
// finishReq finishes up a request, waiting until the operation finishes or, after a timeout, creating an
@ -184,7 +194,19 @@ func (server *ApiServer) finishReq(out <-chan interface{}, sync bool, timeout ti
}
obj, complete := op.StatusOrResult()
if complete {
server.write(http.StatusOK, obj, w)
status := http.StatusOK
switch stat := obj.(type) {
case api.Status:
LogOf(w).Addf("programmer error: use *api.Status as a result, not api.Status.")
if stat.Code != 0 {
status = stat.Code
}
case *api.Status:
if stat.Code != 0 {
status = stat.Code
}
}
server.write(status, obj, w)
} else {
server.write(http.StatusAccepted, obj, w)
}

View File

@ -25,6 +25,9 @@ import (
"github.com/golang/glog"
)
// Return true if a stacktrace should be logged for this status
type StacktracePred func(httpStatus int) (logStacktrace bool)
// Add a layer on top of ResponseWriter, so we can track latency and error
// message sources.
type respLogger struct {
@ -35,17 +38,64 @@ type respLogger struct {
req *http.Request
w http.ResponseWriter
logStacktracePred StacktracePred
}
func DefaultStacktracePred(status int) bool {
return status != http.StatusOK && status != http.StatusAccepted
}
// MakeLogged turns a normal response writer into a logged response writer.
//
// Usage:
// logger := MakeLogged(req, w)
// w = logger // Route response writing actions through w
// defer logger.Log()
func MakeLogged(req *http.Request, w http.ResponseWriter) *respLogger {
return &respLogger{
startTime: time.Now(),
req: req,
w: w,
//
// defer MakeLogged(req, &w).StacktraceWhen(StatusIsNot(200, 202)).Log()
//
// (Only the call to Log() is defered, so you can set everything up in one line!)
//
// Note that this *changes* your writer, to route response writing actions
// through the logger.
//
// Use LogOf(w).Addf(...) to log something along with the response result.
func MakeLogged(req *http.Request, w *http.ResponseWriter) *respLogger {
rl := &respLogger{
startTime: time.Now(),
req: req,
w: *w,
logStacktracePred: DefaultStacktracePred,
}
*w = rl // hijack caller's writer!
return rl
}
// LogOf returns the logger hiding in w. Panics if there isn't such a logger,
// because MakeLogged() must have been previously called for the log to work.
func LogOf(w http.ResponseWriter) *respLogger {
if rl, ok := w.(*respLogger); ok {
return rl
}
panic("Logger not installed yet!")
return nil
}
// Sets the stacktrace logging predicate, which decides when to log a stacktrace.
// There's a default, so you don't need to call this unless you don't like the default.
func (rl *respLogger) StacktraceWhen(pred StacktracePred) *respLogger {
rl.logStacktracePred = pred
return rl
}
// StatusIsNot returns a StacktracePred which will cause stacktraces to be logged
// for any status *not* in the given list.
func StatusIsNot(statuses ...int) StacktracePred {
return func(status int) bool {
for _, s := range statuses {
if status == s {
return false
}
}
return true
}
}
@ -73,7 +123,7 @@ func (rl *respLogger) Write(b []byte) (int, error) {
// Implement http.ResponseWriter
func (rl *respLogger) WriteHeader(status int) {
rl.status = status
if status != http.StatusOK && status != http.StatusAccepted {
if rl.logStacktracePred(status) {
// Only log stacks for errors
stack := make([]byte, 2048)
stack = stack[:runtime.Stack(stack, false)]

View File

@ -100,16 +100,27 @@ func (c *Client) doRequest(request *http.Request) ([]byte, error) {
if err != nil {
return body, err
}
if response.StatusCode < http.StatusOK || response.StatusCode > http.StatusPartialContent {
// Did the server give us a status response?
isStatusResponse := false
var status api.Status
if err := api.DecodeInto(body, &status); err == nil && status.Status != "" {
isStatusResponse = true
}
switch {
case response.StatusCode == http.StatusConflict:
// Return error given by server, if there was one.
if isStatusResponse {
return nil, &StatusErr{status}
}
fallthrough
case response.StatusCode < http.StatusOK || response.StatusCode > http.StatusPartialContent:
return nil, fmt.Errorf("request [%#v] failed (%d) %s: %s", request, response.StatusCode, response.Status, string(body))
}
// If the server gave us a status back, look at what it was.
var status api.Status
if err := api.DecodeInto(body, &status); err == nil && status.Status != "" {
if status.Status == api.StatusSuccess {
return body, nil
}
if isStatusResponse && status.Status != api.StatusSuccess {
// "Working" requests need to be handled specially.
// "Failed" requests are clearly just an error and it makes sense to return them as such.
return nil, &StatusErr{status}

View File

@ -25,6 +25,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd"
"github.com/golang/glog"
@ -34,7 +35,7 @@ import (
// with actual running pods.
// TODO: Remove the etcd dependency and re-factor in terms of a generic watch interface
type ReplicationManager struct {
etcdClient util.EtcdClient
etcdClient tools.EtcdClient
kubeClient client.ClientInterface
podControl PodControlInterface
syncTime <-chan time.Time
@ -76,7 +77,7 @@ func (r RealPodControl) deletePod(podID string) error {
return r.kubeClient.DeletePod(podID)
}
func MakeReplicationManager(etcdClient util.EtcdClient, kubeClient client.ClientInterface) *ReplicationManager {
func MakeReplicationManager(etcdClient tools.EtcdClient, kubeClient client.ClientInterface) *ReplicationManager {
rm := &ReplicationManager{
kubeClient: kubeClient,
etcdClient: etcdClient,
@ -201,7 +202,7 @@ func (rm *ReplicationManager) syncReplicationController(controllerSpec api.Repli
func (rm *ReplicationManager) synchronize() {
var controllerSpecs []api.ReplicationController
helper := util.EtcdHelper{rm.etcdClient}
helper := tools.EtcdHelper{rm.etcdClient}
err := helper.ExtractList("/registry/controllers", &controllerSpecs)
if err != nil {
glog.Errorf("Synchronization error: %v (%#v)", err, err)

View File

@ -26,6 +26,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd"
)
@ -377,8 +378,8 @@ func TestSyncronize(t *testing.T) {
},
}
fakeEtcd := util.MakeFakeEtcdClient(t)
fakeEtcd.Data["/registry/controllers"] = util.EtcdResponseWithError{
fakeEtcd := tools.MakeFakeEtcdClient(t)
fakeEtcd.Data["/registry/controllers"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Nodes: []*etcd.Node{
@ -432,7 +433,7 @@ func (a *asyncTimeout) done() {
func TestWatchControllers(t *testing.T) {
defer beginTimeout(20 * time.Second).done()
fakeEtcd := util.MakeFakeEtcdClient(t)
fakeEtcd := tools.MakeFakeEtcdClient(t)
manager := MakeReplicationManager(fakeEtcd, nil)
var testControllerSpec api.ReplicationController
received := make(chan bool)

View File

@ -34,6 +34,7 @@ import (
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd"
"github.com/fsouza/go-dockerclient"
@ -81,7 +82,7 @@ func New() *Kubelet {
// The main kubelet implementation
type Kubelet struct {
Hostname string
EtcdClient util.EtcdClient
EtcdClient tools.EtcdClient
DockerClient DockerInterface
DockerPuller DockerPuller
CadvisorClient CadvisorInterface
@ -520,7 +521,7 @@ func (kl *Kubelet) ResponseToManifests(response *etcd.Response) ([]api.Container
func (kl *Kubelet) getKubeletStateFromEtcd(key string, updateChannel chan<- manifestUpdate) error {
response, err := kl.EtcdClient.Get(key, true, false)
if err != nil {
if util.IsEtcdNotFound(err) {
if tools.IsEtcdNotFound(err) {
return nil
}
glog.Errorf("Error on etcd get of %s: %v", key, err)

View File

@ -46,9 +46,7 @@ func (s *KubeletServer) error(w http.ResponseWriter, err error) {
}
func (s *KubeletServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
logger := apiserver.MakeLogged(req, w)
w = logger
defer logger.Log()
defer apiserver.MakeLogged(req, &w).Log()
u, err := url.ParseRequestURI(req.RequestURI)
if err != nil {

View File

@ -26,6 +26,7 @@ import (
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd"
"github.com/fsouza/go-dockerclient"
@ -75,8 +76,8 @@ func verifyError(t *testing.T, e error) {
}
}
func makeTestKubelet(t *testing.T) (*Kubelet, *util.FakeEtcdClient, *FakeDockerClient) {
fakeEtcdClient := util.MakeFakeEtcdClient(t)
func makeTestKubelet(t *testing.T) (*Kubelet, *tools.FakeEtcdClient, *FakeDockerClient) {
fakeEtcdClient := tools.MakeFakeEtcdClient(t)
fakeDocker := &FakeDockerClient{
err: nil,
}
@ -279,7 +280,7 @@ func TestGetKubeletStateFromEtcdNoData(t *testing.T) {
kubelet, fakeClient, _ := makeTestKubelet(t)
channel := make(chan manifestUpdate)
reader := startReading(channel)
fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{
fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{
R: &etcd.Response{},
E: nil,
}
@ -298,7 +299,7 @@ func TestGetKubeletStateFromEtcd(t *testing.T) {
kubelet, fakeClient, _ := makeTestKubelet(t)
channel := make(chan manifestUpdate)
reader := startReading(channel)
fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{
fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: util.MakeJSONString([]api.Container{}),
@ -319,7 +320,7 @@ func TestGetKubeletStateFromEtcdNotFound(t *testing.T) {
kubelet, fakeClient, _ := makeTestKubelet(t)
channel := make(chan manifestUpdate)
reader := startReading(channel)
fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{
fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{
R: &etcd.Response{},
E: &etcd.EtcdError{
ErrorCode: 100,
@ -338,7 +339,7 @@ func TestGetKubeletStateFromEtcdError(t *testing.T) {
kubelet, fakeClient, _ := makeTestKubelet(t)
channel := make(chan manifestUpdate)
reader := startReading(channel)
fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{
fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{
R: &etcd.Response{},
E: &etcd.EtcdError{
ErrorCode: 200, // non not found error

View File

@ -21,7 +21,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/golang/glog"
)
@ -30,7 +30,7 @@ import (
// EtcdRegistry is an implementation of both ControllerRegistry and PodRegistry which is backed with etcd.
type EtcdRegistry struct {
etcdClient util.EtcdClient
etcdClient tools.EtcdClient
machines MinionRegistry
manifestFactory ManifestFactory
}
@ -39,7 +39,7 @@ type EtcdRegistry struct {
// 'client' is the connection to etcd
// 'machines' is the list of machines
// 'scheduler' is the scheduling algorithm to use.
func MakeEtcdRegistry(client util.EtcdClient, machines MinionRegistry) *EtcdRegistry {
func MakeEtcdRegistry(client tools.EtcdClient, machines MinionRegistry) *EtcdRegistry {
registry := &EtcdRegistry{
etcdClient: client,
machines: machines,
@ -54,8 +54,8 @@ func makePodKey(machine, podID string) string {
return "/registry/hosts/" + machine + "/pods/" + podID
}
func (registry *EtcdRegistry) helper() *util.EtcdHelper {
return &util.EtcdHelper{registry.etcdClient}
func (registry *EtcdRegistry) helper() *tools.EtcdHelper {
return &tools.EtcdHelper{registry.etcdClient}
}
func (registry *EtcdRegistry) ListPods(selector labels.Selector) ([]api.Pod, error) {

View File

@ -23,11 +23,12 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd"
)
func MakeTestEtcdRegistry(client util.EtcdClient, machines []string) *EtcdRegistry {
func MakeTestEtcdRegistry(client tools.EtcdClient, machines []string) *EtcdRegistry {
registry := MakeEtcdRegistry(client, MakeMinionRegistry(machines))
registry.manifestFactory = &BasicManifestFactory{
serviceRegistry: &MockServiceRegistry{},
@ -36,7 +37,7 @@ func MakeTestEtcdRegistry(client util.EtcdClient, machines []string) *EtcdRegist
}
func TestEtcdGetPod(t *testing.T) {
fakeClient := util.MakeFakeEtcdClient(t)
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.Set("/registry/hosts/machine/pods/foo", util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
pod, err := registry.GetPod("foo")
@ -47,8 +48,8 @@ func TestEtcdGetPod(t *testing.T) {
}
func TestEtcdGetPodNotFound(t *testing.T) {
fakeClient := util.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/hosts/machine/pods/foo"] = util.EtcdResponseWithError{
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/hosts/machine/pods/foo"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
@ -64,8 +65,8 @@ func TestEtcdGetPodNotFound(t *testing.T) {
}
func TestEtcdCreatePod(t *testing.T) {
fakeClient := util.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/hosts/machine/pods/foo"] = util.EtcdResponseWithError{
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/hosts/machine/pods/foo"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
@ -106,8 +107,8 @@ func TestEtcdCreatePod(t *testing.T) {
}
func TestEtcdCreatePodAlreadyExisting(t *testing.T) {
fakeClient := util.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/hosts/machine/pods/foo"] = util.EtcdResponseWithError{
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/hosts/machine/pods/foo"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}),
@ -127,14 +128,14 @@ func TestEtcdCreatePodAlreadyExisting(t *testing.T) {
}
func TestEtcdCreatePodWithContainersError(t *testing.T) {
fakeClient := util.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/hosts/machine/pods/foo"] = util.EtcdResponseWithError{
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/hosts/machine/pods/foo"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
E: &etcd.EtcdError{ErrorCode: 100},
}
fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{
fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
@ -159,14 +160,14 @@ func TestEtcdCreatePodWithContainersError(t *testing.T) {
}
func TestEtcdCreatePodWithContainersNotFound(t *testing.T) {
fakeClient := util.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/hosts/machine/pods/foo"] = util.EtcdResponseWithError{
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/hosts/machine/pods/foo"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
E: &etcd.EtcdError{ErrorCode: 100},
}
fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{
fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
@ -207,8 +208,8 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) {
}
func TestEtcdCreatePodWithExistingContainers(t *testing.T) {
fakeClient := util.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/hosts/machine/pods/foo"] = util.EtcdResponseWithError{
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/hosts/machine/pods/foo"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
@ -254,7 +255,7 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) {
}
func TestEtcdDeletePod(t *testing.T) {
fakeClient := util.MakeFakeEtcdClient(t)
fakeClient := tools.MakeFakeEtcdClient(t)
key := "/registry/hosts/machine/pods/foo"
fakeClient.Set(key, util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0)
fakeClient.Set("/registry/hosts/machine/kubelet", util.MakeJSONString([]api.ContainerManifest{
@ -277,7 +278,7 @@ func TestEtcdDeletePod(t *testing.T) {
}
func TestEtcdDeletePodMultipleContainers(t *testing.T) {
fakeClient := util.MakeFakeEtcdClient(t)
fakeClient := tools.MakeFakeEtcdClient(t)
key := "/registry/hosts/machine/pods/foo"
fakeClient.Set(key, util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0)
fakeClient.Set("/registry/hosts/machine/kubelet", util.MakeJSONString([]api.ContainerManifest{
@ -305,9 +306,9 @@ func TestEtcdDeletePodMultipleContainers(t *testing.T) {
}
func TestEtcdEmptyListPods(t *testing.T) {
fakeClient := util.MakeFakeEtcdClient(t)
fakeClient := tools.MakeFakeEtcdClient(t)
key := "/registry/hosts/machine/pods"
fakeClient.Data[key] = util.EtcdResponseWithError{
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Nodes: []*etcd.Node{},
@ -324,9 +325,9 @@ func TestEtcdEmptyListPods(t *testing.T) {
}
func TestEtcdListPodsNotFound(t *testing.T) {
fakeClient := util.MakeFakeEtcdClient(t)
fakeClient := tools.MakeFakeEtcdClient(t)
key := "/registry/hosts/machine/pods"
fakeClient.Data[key] = util.EtcdResponseWithError{
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{},
E: &etcd.EtcdError{ErrorCode: 100},
}
@ -339,9 +340,9 @@ func TestEtcdListPodsNotFound(t *testing.T) {
}
func TestEtcdListPods(t *testing.T) {
fakeClient := util.MakeFakeEtcdClient(t)
fakeClient := tools.MakeFakeEtcdClient(t)
key := "/registry/hosts/machine/pods"
fakeClient.Data[key] = util.EtcdResponseWithError{
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Nodes: []*etcd.Node{
@ -369,9 +370,9 @@ func TestEtcdListPods(t *testing.T) {
}
func TestEtcdListControllersNotFound(t *testing.T) {
fakeClient := util.MakeFakeEtcdClient(t)
fakeClient := tools.MakeFakeEtcdClient(t)
key := "/registry/controllers"
fakeClient.Data[key] = util.EtcdResponseWithError{
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{},
E: &etcd.EtcdError{ErrorCode: 100},
}
@ -384,9 +385,9 @@ func TestEtcdListControllersNotFound(t *testing.T) {
}
func TestEtcdListServicesNotFound(t *testing.T) {
fakeClient := util.MakeFakeEtcdClient(t)
fakeClient := tools.MakeFakeEtcdClient(t)
key := "/registry/services/specs"
fakeClient.Data[key] = util.EtcdResponseWithError{
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{},
E: &etcd.EtcdError{ErrorCode: 100},
}
@ -399,9 +400,9 @@ func TestEtcdListServicesNotFound(t *testing.T) {
}
func TestEtcdListControllers(t *testing.T) {
fakeClient := util.MakeFakeEtcdClient(t)
fakeClient := tools.MakeFakeEtcdClient(t)
key := "/registry/controllers"
fakeClient.Data[key] = util.EtcdResponseWithError{
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Nodes: []*etcd.Node{
@ -425,7 +426,7 @@ func TestEtcdListControllers(t *testing.T) {
}
func TestEtcdGetController(t *testing.T) {
fakeClient := util.MakeFakeEtcdClient(t)
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.Set("/registry/controllers/foo", util.MakeJSONString(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
ctrl, err := registry.GetController("foo")
@ -436,8 +437,8 @@ func TestEtcdGetController(t *testing.T) {
}
func TestEtcdGetControllerNotFound(t *testing.T) {
fakeClient := util.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/controllers/foo"] = util.EtcdResponseWithError{
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/controllers/foo"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
@ -456,7 +457,7 @@ func TestEtcdGetControllerNotFound(t *testing.T) {
}
func TestEtcdDeleteController(t *testing.T) {
fakeClient := util.MakeFakeEtcdClient(t)
fakeClient := tools.MakeFakeEtcdClient(t)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
err := registry.DeleteController("foo")
expectNoError(t, err)
@ -470,7 +471,7 @@ func TestEtcdDeleteController(t *testing.T) {
}
func TestEtcdCreateController(t *testing.T) {
fakeClient := util.MakeFakeEtcdClient(t)
fakeClient := tools.MakeFakeEtcdClient(t)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
err := registry.CreateController(api.ReplicationController{
JSONBase: api.JSONBase{
@ -489,7 +490,7 @@ func TestEtcdCreateController(t *testing.T) {
}
func TestEtcdUpdateController(t *testing.T) {
fakeClient := util.MakeFakeEtcdClient(t)
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.Set("/registry/controllers/foo", util.MakeJSONString(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
err := registry.UpdateController(api.ReplicationController{
@ -506,9 +507,9 @@ func TestEtcdUpdateController(t *testing.T) {
}
func TestEtcdListServices(t *testing.T) {
fakeClient := util.MakeFakeEtcdClient(t)
fakeClient := tools.MakeFakeEtcdClient(t)
key := "/registry/services/specs"
fakeClient.Data[key] = util.EtcdResponseWithError{
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Nodes: []*etcd.Node{
@ -532,8 +533,8 @@ func TestEtcdListServices(t *testing.T) {
}
func TestEtcdCreateService(t *testing.T) {
fakeClient := util.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/services/specs/foo"] = util.EtcdResponseWithError{
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/services/specs/foo"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
@ -555,7 +556,7 @@ func TestEtcdCreateService(t *testing.T) {
}
func TestEtcdGetService(t *testing.T) {
fakeClient := util.MakeFakeEtcdClient(t)
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.Set("/registry/services/specs/foo", util.MakeJSONString(api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
service, err := registry.GetService("foo")
@ -566,8 +567,8 @@ func TestEtcdGetService(t *testing.T) {
}
func TestEtcdGetServiceNotFound(t *testing.T) {
fakeClient := util.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/services/specs/foo"] = util.EtcdResponseWithError{
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/services/specs/foo"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
@ -583,7 +584,7 @@ func TestEtcdGetServiceNotFound(t *testing.T) {
}
func TestEtcdDeleteService(t *testing.T) {
fakeClient := util.MakeFakeEtcdClient(t)
fakeClient := tools.MakeFakeEtcdClient(t)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
err := registry.DeleteService("foo")
expectNoError(t, err)
@ -601,7 +602,7 @@ func TestEtcdDeleteService(t *testing.T) {
}
func TestEtcdUpdateService(t *testing.T) {
fakeClient := util.MakeFakeEtcdClient(t)
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.Set("/registry/services/specs/foo", util.MakeJSONString(api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
testService := api.Service{
@ -623,7 +624,7 @@ func TestEtcdUpdateService(t *testing.T) {
}
func TestEtcdUpdateEndpoints(t *testing.T) {
fakeClient := util.MakeFakeEtcdClient(t)
fakeClient := tools.MakeFakeEtcdClient(t)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
endpoints := api.Endpoints{
Name: "foo",

18
pkg/tools/doc.go Normal file
View File

@ -0,0 +1,18 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package tools implements general tools which depend on the api package.
package tools

View File

@ -14,13 +14,14 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package util
package tools
import (
"encoding/json"
"fmt"
"reflect"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/coreos/go-etcd/etcd"
)
@ -138,15 +139,29 @@ func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr interface{}, ignoreNot
return "", 0, fmt.Errorf("key '%v' found no nodes field: %#v", key, response)
}
body = response.Node.Value
return body, response.Node.ModifiedIndex, json.Unmarshal([]byte(body), objPtr)
err = json.Unmarshal([]byte(body), objPtr)
if jsonBase, err := api.FindJSONBase(objPtr); err == nil {
jsonBase.ResourceVersion = response.Node.ModifiedIndex
// Note that err shadows the err returned below, so we won't
// return an error just because we failed to find a JSONBase.
// This is intentional.
}
return body, response.Node.ModifiedIndex, err
}
// SetObj marshals obj via json, and stores under key.
// SetObj marshals obj via json, and stores under key. Will do an
// atomic update if obj's ResourceVersion field is set.
func (h *EtcdHelper) SetObj(key string, obj interface{}) error {
data, err := json.Marshal(obj)
if err != nil {
return err
}
if jsonBase, err := api.FindJSONBaseRO(obj); err == nil && jsonBase.ResourceVersion != 0 {
_, err = h.Client.CompareAndSwap(key, string(data), 0, "", jsonBase.ResourceVersion)
return err // err is shadowed!
}
// TODO: when client supports atomic creation, integrate this with the above.
_, err = h.Client.Set(key, string(data), 0)
return err
}

View File

@ -14,13 +14,14 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package util
package tools
import (
"fmt"
"reflect"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd"
)
@ -83,7 +84,7 @@ func TestExtractList(t *testing.T) {
func TestExtractObj(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t)
expect := testMarshalType{ID: "foo"}
fakeClient.Set("/some/key", MakeJSONString(expect), 0)
fakeClient.Set("/some/key", util.MakeJSONString(expect), 0)
helper := EtcdHelper{fakeClient}
var got testMarshalType
err := helper.ExtractObj("/some/key", &got, false)
@ -143,7 +144,7 @@ func TestSetObj(t *testing.T) {
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
expect := MakeJSONString(obj)
expect := util.MakeJSONString(obj)
got := fakeClient.Data["/some/key"].R.Node.Value
if expect != got {
t.Errorf("Wanted %v, got %v", expect, got)

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package util
package tools
import (
"fmt"

View File

@ -15,5 +15,6 @@ limitations under the License.
*/
// Package util implements various utility functions used in both testing and implementation
// of Kubernetes
// of Kubernetes. Package util may not depend on any other package in the Kubernetes
// package tree.
package util