From b25f950362185f7d561bcfbf013088b08f6f533c Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Thu, 26 Jun 2014 20:24:10 -0700 Subject: [PATCH] Use etcd compare and swap to update the list of pods, to remove a race. --- api/examples/controller.json | 1 + cmd/integration/integration.go | 9 ++-- pkg/registry/etcd_registry.go | 84 ++++++++++++++++++------------ pkg/registry/etcd_registry_test.go | 15 ++++++ pkg/util/etcd_tools.go | 48 ++++++++++++++--- pkg/util/etcd_tools_test.go | 6 +-- pkg/util/fake_etcd_client.go | 18 +++---- 7 files changed, 124 insertions(+), 57 deletions(-) diff --git a/api/examples/controller.json b/api/examples/controller.json index 8b6da66ca85..998db43aaa5 100644 --- a/api/examples/controller.json +++ b/api/examples/controller.json @@ -7,6 +7,7 @@ "desiredState": { "manifest": { "containers": [{ + "name": "nginx", "image": "dockerfile/nginx", "ports": [{"containerPort": 80, "hostPort": 8080}] }] diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index cd084d78310..66b214720f3 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -23,6 +23,7 @@ import ( "io/ioutil" "net/http" "net/http/httptest" + "runtime" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -36,6 +37,7 @@ import ( ) func main() { + runtime.GOMAXPROCS(4) util.InitLogs() defer util.FlushLogs() @@ -51,7 +53,7 @@ func main() { controllerManager := controller.MakeReplicationManager(etcd.NewClient(servers), client.New(apiserver.URL, nil)) - controllerManager.Run(10 * time.Second) + controllerManager.Run(1 * time.Second) // Kublet fakeDocker1 := &kubelet.FakeDockerClient{} @@ -102,7 +104,7 @@ func main() { // Validate that they're truly up. pods, err := kubeClient.ListPods(nil) if err != nil || len(pods.Items) != 2 { - glog.Fatal("FAILED") + glog.Fatal("FAILED: %#v", pods.Items) } // Check that kubelet tried to make the pods. @@ -124,7 +126,7 @@ func main() { // We expect 5: 2 net containers + 2 pods from the replication controller + // 1 net container + 2 pods from the URL. if len(createdPods) != 7 { - glog.Fatalf("Unexpected list of created pods: %#v\n", createdPods) + glog.Fatalf("Unexpected list of created pods: %#v %#v %#v\n", createdPods, fakeDocker1.Created, fakeDocker2.Created) } glog.Infof("OK") } @@ -146,6 +148,7 @@ const ( // This is copied from, and should be kept in sync with: // https://raw.githubusercontent.com/GoogleCloudPlatform/container-vm-guestbook-redis-python/master/manifest.yaml testManifestFile = `version: v1beta1 +id: web-test containers: - name: redis image: dockerfile/redis diff --git a/pkg/registry/etcd_registry.go b/pkg/registry/etcd_registry.go index 9c608e3b370..b2a7a68c898 100644 --- a/pkg/registry/etcd_registry.go +++ b/pkg/registry/etcd_registry.go @@ -90,13 +90,17 @@ func makeContainerKey(machine string) string { return "/registry/hosts/" + machine + "/kubelet" } -func (registry *EtcdRegistry) loadManifests(machine string) (manifests []api.ContainerManifest, err error) { - err = registry.helper().ExtractObj(makeContainerKey(machine), &manifests, true) - return +func (registry *EtcdRegistry) loadManifests(machine string) (manifests []api.ContainerManifest, index uint64, err error) { + err, index = registry.helper().ExtractObj(makeContainerKey(machine), &manifests, true) + return manifests, index, err } -func (registry *EtcdRegistry) updateManifests(machine string, manifests []api.ContainerManifest) error { - return registry.helper().SetObj(makeContainerKey(machine), manifests) +func (registry *EtcdRegistry) updateManifests(machine string, manifests []api.ContainerManifest, index uint64) error { + if index != 0 { + return registry.helper().CompareAndSwapObj(makeContainerKey(machine), manifests, index) + } else { + return registry.helper().SetObj(makeContainerKey(machine), manifests) + } } func (registry *EtcdRegistry) CreatePod(machineIn string, pod api.Pod) error { @@ -108,7 +112,7 @@ func (registry *EtcdRegistry) CreatePod(machineIn string, pod api.Pod) error { } func (registry *EtcdRegistry) runPod(pod api.Pod, machine string) error { - manifests, err := registry.loadManifests(machine) + manifests, index, err := registry.loadManifests(machine) if err != nil { return err } @@ -124,8 +128,18 @@ func (registry *EtcdRegistry) runPod(pod api.Pod, machine string) error { if err != nil { return err } - manifests = append(manifests, manifest) - return registry.updateManifests(machine, manifests) + for { + manifests = append(manifests, manifest) + err = registry.updateManifests(machine, manifests, index) + if util.IsEtcdConflict(err) { + manifests, index, err = registry.loadManifests(machine) + if err != nil { + return err + } + continue + } + return err + } } func (registry *EtcdRegistry) UpdatePod(pod api.Pod) error { @@ -141,36 +155,42 @@ func (registry *EtcdRegistry) DeletePod(podID string) error { } func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error { - manifests, err := registry.loadManifests(machine) - if err != nil { - return err - } - newManifests := make([]api.ContainerManifest, 0) - found := false - for _, manifest := range manifests { - if manifest.Id != podID { - newManifests = append(newManifests, manifest) - } else { - found = true + for { + manifests, index, err := registry.loadManifests(machine) + if err != nil { + return err } - } - if !found { - // This really shouldn't happen, it indicates something is broken, and likely - // there is a lost pod somewhere. - // However it is "deleted" so log it and move on - glog.Infof("Couldn't find: %s in %#v", podID, manifests) - } - if err = registry.updateManifests(machine, newManifests); err != nil { - return err + newManifests := make([]api.ContainerManifest, 0) + found := false + for _, manifest := range manifests { + if manifest.Id != podID { + newManifests = append(newManifests, manifest) + } else { + found = true + } + } + if !found { + // This really shouldn't happen, it indicates something is broken, and likely + // there is a lost pod somewhere. + // However it is "deleted" so log it and move on + glog.Infof("Couldn't find: %s in %#v", podID, manifests) + } + if err = registry.updateManifests(machine, newManifests, index); err != nil { + if util.IsEtcdConflict(err) { + continue + } + return err + } + break } key := makePodKey(machine, podID) - _, err = registry.etcdClient.Delete(key, true) + _, err := registry.etcdClient.Delete(key, true) return err } func (registry *EtcdRegistry) getPodForMachine(machine, podID string) (pod api.Pod, err error) { key := makePodKey(machine, podID) - err = registry.helper().ExtractObj(key, &pod, false) + err, _ = registry.helper().ExtractObj(key, &pod, false) if err != nil { return } @@ -205,7 +225,7 @@ func makeControllerKey(id string) string { func (registry *EtcdRegistry) GetController(controllerID string) (*api.ReplicationController, error) { var controller api.ReplicationController key := makeControllerKey(controllerID) - err := registry.helper().ExtractObj(key, &controller, false) + err, _ := registry.helper().ExtractObj(key, &controller, false) if err != nil { return nil, err } @@ -244,7 +264,7 @@ func (registry *EtcdRegistry) CreateService(svc api.Service) error { func (registry *EtcdRegistry) GetService(name string) (*api.Service, error) { key := makeServiceKey(name) var svc api.Service - err := registry.helper().ExtractObj(key, &svc, false) + err, _ := registry.helper().ExtractObj(key, &svc, false) if err != nil { return nil, err } diff --git a/pkg/registry/etcd_registry_test.go b/pkg/registry/etcd_registry_test.go index 29de298a383..e48475334b9 100644 --- a/pkg/registry/etcd_registry_test.go +++ b/pkg/registry/etcd_registry_test.go @@ -640,3 +640,18 @@ func TestEtcdUpdateEndpoints(t *testing.T) { t.Errorf("Unexpected endpoints: %#v, expected %#v", endpointsOut, endpoints) } } + +// TODO We need a test for the compare and swap behavior. This basically requires two things: +// 1) Add a per-operation synchronization channel to the fake etcd client, such that any operation waits on that +// channel, this will enable us to orchestrate the flow of etcd requests in the test. +// 2) We need to make the map from key to (response, error) actually be a [](response, error) and pop +// our way through the responses. That will enable us to hand back multiple different responses for +// the same key. +// Once that infrastructure is in place, the test looks something like: +// Routine #1 Routine #2 +// Read +// Wait for sync on update Read +// Update +// Update +// In the buggy case, this will result in lost data. In the correct case, the second update should fail +// and be retried. diff --git a/pkg/util/etcd_tools.go b/pkg/util/etcd_tools.go index e97903c4aec..26b2641939c 100644 --- a/pkg/util/etcd_tools.go +++ b/pkg/util/etcd_tools.go @@ -24,10 +24,24 @@ import ( "github.com/coreos/go-etcd/etcd" ) +// EtcdClient is an injectable interface for testing. +type EtcdClient interface { + AddChild(key, data string, ttl uint64) (*etcd.Response, error) + Get(key string, sort, recursive bool) (*etcd.Response, error) + Set(key, value string, ttl uint64) (*etcd.Response, error) + Create(key, value string, ttl uint64) (*etcd.Response, error) + CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error) + Delete(key string, recursive bool) (*etcd.Response, error) + // I'd like to use directional channels here (e.g. <-chan) but this interface mimics + // the etcd client interface which doesn't, and it doesn't seem worth it to wrap the api. + Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) +} + // Interface exposing only the etcd operations needed by EtcdHelper. type EtcdGetSet interface { Get(key string, sort, recursive bool) (*etcd.Response, error) Set(key, value string, ttl uint64) (*etcd.Response, error) + CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error) } // EtcdHelper offers common object marshalling/unmarshalling operations on an etcd client. @@ -37,6 +51,16 @@ type EtcdHelper struct { // Returns true iff err is an etcd not found error. func IsEtcdNotFound(err error) bool { + return isEtcdErrorNum(err, 100) +} + +// Returns true iff err is an etcd write conflict. +func IsEtcdConflict(err error) bool { + return isEtcdErrorNum(err, 101) +} + +// Returns true iff err is an etcd error, whose errorCode matches errorCode +func isEtcdErrorNum(err error, errorCode int) bool { if err == nil { return false } @@ -46,7 +70,7 @@ func IsEtcdNotFound(err error) bool { if etcdError == nil { return false } - if etcdError.ErrorCode == 100 { + if etcdError.ErrorCode == errorCode { return true } } @@ -92,23 +116,33 @@ func (h *EtcdHelper) ExtractList(key string, slicePtr interface{}) error { // Unmarshals json found at key into objPtr. On a not found error, will either return // a zero object of the requested type, or an error, depending on ignoreNotFound. Treats // empty responses and nil response nodes exactly like a not found error. -func (h *EtcdHelper) ExtractObj(key string, objPtr interface{}, ignoreNotFound bool) error { +func (h *EtcdHelper) ExtractObj(key string, objPtr interface{}, ignoreNotFound bool) (error, uint64) { response, err := h.Client.Get(key, false, false) if err != nil && !IsEtcdNotFound(err) { - return err + return err, 0 } if err != nil || response.Node == nil || len(response.Node.Value) == 0 { if ignoreNotFound { pv := reflect.ValueOf(objPtr) pv.Elem().Set(reflect.Zero(pv.Type().Elem())) - return nil + return nil, 0 } else if err != nil { - return err + return err, 0 } - return fmt.Errorf("key '%v' found no nodes field: %#v", key, response) + return fmt.Errorf("key '%v' found no nodes field: %#v", key, response), 0 } - return json.Unmarshal([]byte(response.Node.Value), objPtr) + return json.Unmarshal([]byte(response.Node.Value), objPtr), response.Node.ModifiedIndex +} + +// CompareAndSwapObj marshals obj via json, and stores under key so long as index matches the previous modified index +func (h *EtcdHelper) CompareAndSwapObj(key string, obj interface{}, index uint64) error { + data, err := json.Marshal(obj) + if err != nil { + return err + } + _, err = h.Client.CompareAndSwap(key, string(data), 0, "", index) + return err } // SetObj marshals obj via json, and stores under key. diff --git a/pkg/util/etcd_tools_test.go b/pkg/util/etcd_tools_test.go index 30c4c6243b9..a322e504216 100644 --- a/pkg/util/etcd_tools_test.go +++ b/pkg/util/etcd_tools_test.go @@ -86,7 +86,7 @@ func TestExtractObj(t *testing.T) { fakeClient.Set("/some/key", MakeJSONString(expect), 0) helper := EtcdHelper{fakeClient} var got testMarshalType - err := helper.ExtractObj("/some/key", &got, false) + err, _ := helper.ExtractObj("/some/key", &got, false) if err != nil { t.Errorf("Unexpected error %#v", err) } @@ -120,11 +120,11 @@ func TestExtractObjNotFoundErr(t *testing.T) { helper := EtcdHelper{fakeClient} try := func(key string) { var got testMarshalType - err := helper.ExtractObj(key, &got, false) + err, _ := helper.ExtractObj(key, &got, false) if err == nil { t.Errorf("%s: wanted error but didn't get one", key) } - err = helper.ExtractObj(key, &got, true) + err, _ = helper.ExtractObj(key, &got, true) if err != nil { t.Errorf("%s: didn't want error but got %#v", key, err) } diff --git a/pkg/util/fake_etcd_client.go b/pkg/util/fake_etcd_client.go index 78da6b0689b..918af95064c 100644 --- a/pkg/util/fake_etcd_client.go +++ b/pkg/util/fake_etcd_client.go @@ -23,18 +23,6 @@ import ( "github.com/coreos/go-etcd/etcd" ) -// EtcdClient is an injectable interface for testing. -type EtcdClient interface { - AddChild(key, data string, ttl uint64) (*etcd.Response, error) - Get(key string, sort, recursive bool) (*etcd.Response, error) - Set(key, value string, ttl uint64) (*etcd.Response, error) - Create(key, value string, ttl uint64) (*etcd.Response, error) - Delete(key string, recursive bool) (*etcd.Response, error) - // I'd like to use directional channels here (e.g. <-chan) but this interface mimics - // the etcd client interface which doesn't, and it doesn't seem worth it to wrap the api. - Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) -} - type EtcdResponseWithError struct { R *etcd.Response E error @@ -87,6 +75,12 @@ func (f *FakeEtcdClient) Set(key, value string, ttl uint64) (*etcd.Response, err f.Data[key] = result return result.R, f.Err } + +func (f *FakeEtcdClient) CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error) { + // TODO: Maybe actually implement compare and swap here? + return f.Set(key, value, ttl) +} + func (f *FakeEtcdClient) Create(key, value string, ttl uint64) (*etcd.Response, error) { return f.Set(key, value, ttl) }