diff --git a/pkg/election/doc.go b/pkg/election/doc.go new file mode 100644 index 00000000000..6982d3ec8aa --- /dev/null +++ b/pkg/election/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package election provides interfaces used for master election. +package election diff --git a/pkg/election/etcd_master.go b/pkg/election/etcd_master.go new file mode 100644 index 00000000000..f9e7dd1aeae --- /dev/null +++ b/pkg/election/etcd_master.go @@ -0,0 +1,176 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package election + +import ( + "fmt" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + "github.com/coreos/go-etcd/etcd" + "github.com/golang/glog" +) + +// NewEtcdMasterElector returns an implementation of election.MasterElector backed by etcd +func NewEtcdMasterElector(h tools.EtcdGetSet) MasterElector { + return &etcdMasterElector{etcd: h} +} + +type empty struct{} + +// internal implementation struct +type etcdMasterElector struct { + etcd tools.EtcdGetSet + done chan empty + events chan watch.Event +} + +// Elect implements the election.MasterElector interface +func (e *etcdMasterElector) Elect(path, id string) watch.Interface { + e.done = make(chan empty) + e.events = make(chan watch.Event) + go util.Forever(func() { e.run(path, id) }, time.Second*5) + return e +} + +func (e *etcdMasterElector) run(path, id string) { + masters := make(chan string) + errors := make(chan error) + go e.master(path, id, 30, masters, errors, e.done) + for { + select { + case m := <-masters: + e.events <- watch.Event{ + Type: watch.Modified, + Object: m, + } + case e := <-errors: + glog.Errorf("error in election: %v", e) + } + } +} + +// ResultChan implements the watch.Interface interface +func (e *etcdMasterElector) ResultChan() <-chan watch.Event { + return e.events +} + +// extendMaster attempts to extend ownership of a master lock for TTL seconds +// returns "", nil if extension failed +// returns id, nil if extension succeeded +// returns "", err if an error occurred +func (e *etcdMasterElector) extendMaster(path, id string, ttl uint64, res *etcd.Response) (string, error) { + // If it matches the passed in id, extend the lease by writing a new entry. + // Uses compare and swap, so that if we TTL out in the meantime, the write will fail. + // We don't handle the TTL delete w/o a write case here, it's handled in the next loop + // iteration. + _, err := e.etcd.CompareAndSwap(path, id, ttl, "", res.Node.ModifiedIndex) + if err != nil && !tools.IsEtcdTestFailed(err) { + return "", err + } + if err != nil && tools.IsEtcdTestFailed(err) { + return "", nil + } + return id, nil +} + +// becomeMaster attempts to become the master for this lock +// returns "", nil if the attempt failed +// returns id, nil if the attempt succeeded +// returns "", err if an error occurred +func (e *etcdMasterElector) becomeMaster(path, id string, ttl uint64) (string, error) { + _, err := e.etcd.Create(path, id, ttl) + if err != nil && !tools.IsEtcdNodeExist(err) { + // unexpected error + return "", err + } + if err != nil && tools.IsEtcdNodeExist(err) { + return "", nil + } + return id, nil +} + +// handleMaster performs one loop of master locking +// on success it returns , nil +// on error it returns "", err +// in situations where you should try again due to concurrent state changes (e.g. another actor simultaneously acquiring the lock) +// it returns "", nil +func (e *etcdMasterElector) handleMaster(path, id string, ttl uint64) (string, error) { + res, err := e.etcd.Get(path, false, false) + + // Unexpected error, bail out + if err != nil && !tools.IsEtcdNotFound(err) { + return "", err + } + + // There is no master, try to become the master. + if err != nil && tools.IsEtcdNotFound(err) { + return e.becomeMaster(path, id, ttl) + } + + // This should never happen. + if res.Node == nil { + return "", fmt.Errorf("unexpected response: %#v", res) + } + + // We're not the master, just return the current value + if res.Node.Value != id { + return res.Node.Value, nil + } + + // We are the master, try to extend out lease + return e.extendMaster(path, id, ttl, res) +} + +// Master provices a distributed master election lock, maintains lock until failure, or someone sends something in the done channel +// The basic algorithm is: +// while !done +// Get the current master +// If there is no current master +// Try to become the master +// Otherwise +// If we are the master, extend the lease +// If the master is different than the last time through the loop, report the master +// Sleep 80% of TTL +func (e *etcdMasterElector) master(path, id string, ttl uint64, masters chan<- string, errors chan<- error, done <-chan empty) { + lastMaster := "" + for { + master, err := e.handleMaster(path, id, ttl) + if err != nil { + errors <- err + } else if len(master) == 0 { + continue + } else if master != lastMaster { + lastMaster = master + masters <- master + } + // TODO: Add Watch here, skip the polling for faster reactions + // If done is closed, break out. + select { + case <-done: + return + case <-time.After(time.Duration((ttl*8)/10) * time.Second): + } + } +} + +// ResultChan implements the watch.Interface interface +func (e *etcdMasterElector) Stop() { + close(e.done) +} diff --git a/pkg/election/etcd_master_test.go b/pkg/election/etcd_master_test.go new file mode 100644 index 00000000000..5e342153f4f --- /dev/null +++ b/pkg/election/etcd_master_test.go @@ -0,0 +1,98 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package election + +import ( + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + "github.com/coreos/go-etcd/etcd" +) + +func TestEtcdMasterOther(t *testing.T) { + path := "foo" + etcd := tools.MakeFakeEtcdClient(t) + etcd.Set(path, "baz", 0) + master := NewEtcdMasterElector(etcd) + w := master.Elect(path, "bar") + result := <-w.ResultChan() + if result.Type != watch.Modified || result.Object.(string) != "baz" { + t.Errorf("unexpected event: %#v", result) + } + w.Stop() +} + +func TestEtcdMasterNoOther(t *testing.T) { + path := "foo" + e := tools.MakeFakeEtcdClient(t) + e.TestIndex = true + e.Data["foo"] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: nil, + }, + E: &etcd.EtcdError{ + ErrorCode: tools.EtcdErrorCodeNotFound, + }, + } + master := NewEtcdMasterElector(e) + w := master.Elect(path, "bar") + result := <-w.ResultChan() + if result.Type != watch.Modified || result.Object.(string) != "bar" { + t.Errorf("unexpected event: %#v", result) + } + w.Stop() +} + +func TestEtcdMasterNoOtherThenConflict(t *testing.T) { + path := "foo" + e := tools.MakeFakeEtcdClient(t) + e.TestIndex = true + // Ok, so we set up a chain of responses from etcd: + // 1) Nothing there + // 2) conflict (someone else wrote) + // 3) new value (the data they wrote) + empty := tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: nil, + }, + E: &etcd.EtcdError{ + ErrorCode: tools.EtcdErrorCodeNotFound, + }, + } + empty.N = &tools.EtcdResponseWithError{ + R: &etcd.Response{}, + E: &etcd.EtcdError{ + ErrorCode: tools.EtcdErrorCodeNodeExist, + }, + } + empty.N.N = &tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: "baz", + }, + }, + } + e.Data["foo"] = empty + master := NewEtcdMasterElector(e) + w := master.Elect(path, "bar") + result := <-w.ResultChan() + if result.Type != watch.Modified || result.Object.(string) != "bar" { + t.Errorf("unexpected event: %#v", result) + } + w.Stop() +} diff --git a/pkg/election/master.go b/pkg/election/master.go new file mode 100644 index 00000000000..14b16da5564 --- /dev/null +++ b/pkg/election/master.go @@ -0,0 +1,34 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package election + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +// MasterElector is an interface for services that can elect masters. +// Important Note: MasterElectors are not inter-operable, all participants in the election need to be +// using the same underlying implementation of this interface for correct behavior. +type MasterElector interface { + // RequestMaster makes the caller represented by 'id' enter into a master election for the + // distributed lock defined by 'path' + // The returned watch.Interface provides a stream of Master objects which + // contain the current master. + // Calling Stop on the returned interface relinquishes ownership (if currently possesed) + // and removes the caller from the election + Elect(path, id string) watch.Interface +} diff --git a/pkg/tools/etcd_tools.go b/pkg/tools/etcd_tools.go index 9c5e4e10a6b..a77fdb36f5d 100644 --- a/pkg/tools/etcd_tools.go +++ b/pkg/tools/etcd_tools.go @@ -91,16 +91,16 @@ func IsEtcdNotFound(err error) bool { return isEtcdErrorNum(err, EtcdErrorCodeNotFound) } -// IsEtcdTestFailed returns true iff err is an etcd write conflict. -func IsEtcdTestFailed(err error) bool { - return isEtcdErrorNum(err, EtcdErrorCodeTestFailed) -} - // IsEtcdNodeExist returns true iff err is an etcd node aleady exist error. func IsEtcdNodeExist(err error) bool { return isEtcdErrorNum(err, EtcdErrorCodeNodeExist) } +// IsEtcdTestFailed returns true iff err is an etcd write conflict. +func IsEtcdTestFailed(err error) bool { + return isEtcdErrorNum(err, EtcdErrorCodeTestFailed) +} + // IsEtcdWatchStoppedByUser returns true iff err is a client triggered stop. func IsEtcdWatchStoppedByUser(err error) bool { return etcd.ErrWatchStoppedByUser == err diff --git a/pkg/tools/fake_etcd_client.go b/pkg/tools/fake_etcd_client.go index 257b5907047..58e7d68461f 100644 --- a/pkg/tools/fake_etcd_client.go +++ b/pkg/tools/fake_etcd_client.go @@ -27,6 +27,8 @@ import ( type EtcdResponseWithError struct { R *etcd.Response E error + // if N is non-null, it will be assigned into the map after this response is used for an operation + N *EtcdResponseWithError } // TestLogger is a type passed to Test functions to support formatted test logs. @@ -92,6 +94,15 @@ func (f *FakeEtcdClient) generateIndex() uint64 { return f.ChangeIndex } +// Requires that f.Mutex be held. +func (f *FakeEtcdClient) updateResponse(key string) { + resp, found := f.Data[key] + if !found || resp.N == nil { + return + } + f.Data[key] = *resp.N +} + func (f *FakeEtcdClient) AddChild(key, data string, ttl uint64) (*etcd.Response, error) { f.Mutex.Lock() defer f.Mutex.Unlock() @@ -103,6 +114,7 @@ func (f *FakeEtcdClient) AddChild(key, data string, ttl uint64) (*etcd.Response, func (f *FakeEtcdClient) Get(key string, sort, recursive bool) (*etcd.Response, error) { f.Mutex.Lock() defer f.Mutex.Unlock() + defer f.updateResponse(key) result := f.Data[key] if result.R == nil { @@ -161,6 +173,7 @@ func (f *FakeEtcdClient) setLocked(key, value string, ttl uint64) (*etcd.Respons func (f *FakeEtcdClient) Set(key, value string, ttl uint64) (*etcd.Response, error) { f.Mutex.Lock() defer f.Mutex.Unlock() + defer f.updateResponse(key) return f.setLocked(key, value, ttl) } @@ -182,6 +195,7 @@ func (f *FakeEtcdClient) CompareAndSwap(key, value string, ttl uint64, prevValue f.Mutex.Lock() defer f.Mutex.Unlock() + defer f.updateResponse(key) if !f.nodeExists(key) { f.t.Logf("c&s: node doesn't exist") @@ -206,6 +220,7 @@ func (f *FakeEtcdClient) CompareAndSwap(key, value string, ttl uint64, prevValue func (f *FakeEtcdClient) Create(key, value string, ttl uint64) (*etcd.Response, error) { f.Mutex.Lock() defer f.Mutex.Unlock() + defer f.updateResponse(key) if f.nodeExists(key) { return nil, EtcdErrorNodeExist