diff --git a/pkg/election/doc.go b/pkg/election/doc.go deleted file mode 100644 index 6982d3ec8aa..00000000000 --- a/pkg/election/doc.go +++ /dev/null @@ -1,18 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Package election provides interfaces used for master election. -package election diff --git a/pkg/election/etcd_master.go b/pkg/election/etcd_master.go deleted file mode 100644 index 35775f2a03f..00000000000 --- a/pkg/election/etcd_master.go +++ /dev/null @@ -1,185 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package election - -import ( - "fmt" - "time" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" - "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" - "github.com/coreos/go-etcd/etcd" - "github.com/golang/glog" -) - -// Master is used to announce the current elected master. -type Master string - -// IsAnAPIObject is used solely so we can work with the watch package. -// TODO: Either fix watch so this isn't necessary, or make this a real API Object. -// TODO: when it becomes clear how this package will be used, move these declarations to -// to the proper place. -func (Master) IsAnAPIObject() {} - -// NewEtcdMasterElector returns an implementation of election.MasterElector backed by etcd. -func NewEtcdMasterElector(h tools.EtcdGetSet) MasterElector { - return &etcdMasterElector{etcd: h} -} - -type empty struct{} - -// internal implementation struct -type etcdMasterElector struct { - etcd tools.EtcdGetSet - done chan empty - events chan watch.Event -} - -// Elect implements the election.MasterElector interface. -func (e *etcdMasterElector) Elect(path, id string) watch.Interface { - e.done = make(chan empty) - e.events = make(chan watch.Event) - go util.Forever(func() { e.run(path, id) }, time.Second*5) - return e -} - -func (e *etcdMasterElector) run(path, id string) { - masters := make(chan string) - errors := make(chan error) - go e.master(path, id, 30, masters, errors, e.done) - for { - select { - case m := <-masters: - e.events <- watch.Event{ - Type: watch.Modified, - Object: Master(m), - } - case e := <-errors: - glog.Errorf("error in election: %v", e) - } - } -} - -// ResultChan implements the watch.Interface interface. -func (e *etcdMasterElector) ResultChan() <-chan watch.Event { - return e.events -} - -// extendMaster attempts to extend ownership of a master lock for TTL seconds. -// returns "", nil if extension failed -// returns id, nil if extension succeeded -// returns "", err if an error occurred -func (e *etcdMasterElector) extendMaster(path, id string, ttl uint64, res *etcd.Response) (string, error) { - // If it matches the passed in id, extend the lease by writing a new entry. - // Uses compare and swap, so that if we TTL out in the meantime, the write will fail. - // We don't handle the TTL delete w/o a write case here, it's handled in the next loop - // iteration. - _, err := e.etcd.CompareAndSwap(path, id, ttl, "", res.Node.ModifiedIndex) - if err != nil && !tools.IsEtcdTestFailed(err) { - return "", err - } - if err != nil && tools.IsEtcdTestFailed(err) { - return "", nil - } - return id, nil -} - -// becomeMaster attempts to become the master for this lock. -// returns "", nil if the attempt failed -// returns id, nil if the attempt succeeded -// returns "", err if an error occurred -func (e *etcdMasterElector) becomeMaster(path, id string, ttl uint64) (string, error) { - _, err := e.etcd.Create(path, id, ttl) - if err != nil && !tools.IsEtcdNodeExist(err) { - // unexpected error - return "", err - } - if err != nil && tools.IsEtcdNodeExist(err) { - return "", nil - } - return id, nil -} - -// handleMaster performs one loop of master locking. -// on success it returns , nil -// on error it returns "", err -// in situations where you should try again due to concurrent state changes (e.g. another actor simultaneously acquiring the lock) -// it returns "", nil -func (e *etcdMasterElector) handleMaster(path, id string, ttl uint64) (string, error) { - res, err := e.etcd.Get(path, false, false) - - // Unexpected error, bail out - if err != nil && !tools.IsEtcdNotFound(err) { - return "", err - } - - // There is no master, try to become the master. - if err != nil && tools.IsEtcdNotFound(err) { - return e.becomeMaster(path, id, ttl) - } - - // This should never happen. - if res.Node == nil { - return "", fmt.Errorf("unexpected response: %#v", res) - } - - // We're not the master, just return the current value - if res.Node.Value != id { - return res.Node.Value, nil - } - - // We are the master, try to extend out lease - return e.extendMaster(path, id, ttl, res) -} - -// master provices a distributed master election lock, maintains lock until failure, or someone sends something in the done channel. -// The basic algorithm is: -// while !done -// Get the current master -// If there is no current master -// Try to become the master -// Otherwise -// If we are the master, extend the lease -// If the master is different than the last time through the loop, report the master -// Sleep 80% of TTL -func (e *etcdMasterElector) master(path, id string, ttl uint64, masters chan<- string, errors chan<- error, done <-chan empty) { - lastMaster := "" - for { - master, err := e.handleMaster(path, id, ttl) - if err != nil { - errors <- err - } else if len(master) == 0 { - continue - } else if master != lastMaster { - lastMaster = master - masters <- master - } - // TODO: Add Watch here, skip the polling for faster reactions - // If done is closed, break out. - select { - case <-done: - return - case <-time.After(time.Duration((ttl*8)/10) * time.Second): - } - } -} - -// ResultChan implements the watch.Interface interface -func (e *etcdMasterElector) Stop() { - close(e.done) -} diff --git a/pkg/election/etcd_master_test.go b/pkg/election/etcd_master_test.go deleted file mode 100644 index a34b3eb3c8c..00000000000 --- a/pkg/election/etcd_master_test.go +++ /dev/null @@ -1,98 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package election - -import ( - "testing" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" - "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" - "github.com/coreos/go-etcd/etcd" -) - -func TestEtcdMasterOther(t *testing.T) { - path := "foo" - etcd := tools.NewFakeEtcdClient(t) - etcd.Set(path, "baz", 0) - master := NewEtcdMasterElector(etcd) - w := master.Elect(path, "bar") - result := <-w.ResultChan() - if result.Type != watch.Modified || result.Object.(Master) != "baz" { - t.Errorf("unexpected event: %#v", result) - } - w.Stop() -} - -func TestEtcdMasterNoOther(t *testing.T) { - path := "foo" - e := tools.NewFakeEtcdClient(t) - e.TestIndex = true - e.Data["foo"] = tools.EtcdResponseWithError{ - R: &etcd.Response{ - Node: nil, - }, - E: &etcd.EtcdError{ - ErrorCode: tools.EtcdErrorCodeNotFound, - }, - } - master := NewEtcdMasterElector(e) - w := master.Elect(path, "bar") - result := <-w.ResultChan() - if result.Type != watch.Modified || result.Object.(Master) != "bar" { - t.Errorf("unexpected event: %#v", result) - } - w.Stop() -} - -func TestEtcdMasterNoOtherThenConflict(t *testing.T) { - path := "foo" - e := tools.NewFakeEtcdClient(t) - e.TestIndex = true - // Ok, so we set up a chain of responses from etcd: - // 1) Nothing there - // 2) conflict (someone else wrote) - // 3) new value (the data they wrote) - empty := tools.EtcdResponseWithError{ - R: &etcd.Response{ - Node: nil, - }, - E: &etcd.EtcdError{ - ErrorCode: tools.EtcdErrorCodeNotFound, - }, - } - empty.N = &tools.EtcdResponseWithError{ - R: &etcd.Response{}, - E: &etcd.EtcdError{ - ErrorCode: tools.EtcdErrorCodeNodeExist, - }, - } - empty.N.N = &tools.EtcdResponseWithError{ - R: &etcd.Response{ - Node: &etcd.Node{ - Value: "baz", - }, - }, - } - e.Data["foo"] = empty - master := NewEtcdMasterElector(e) - w := master.Elect(path, "bar") - result := <-w.ResultChan() - if result.Type != watch.Modified || result.Object.(Master) != "bar" { - t.Errorf("unexpected event: %#v", result) - } - w.Stop() -} diff --git a/pkg/election/fake.go b/pkg/election/fake.go deleted file mode 100644 index 27852b62bbf..00000000000 --- a/pkg/election/fake.go +++ /dev/null @@ -1,53 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package election - -import ( - "sync" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" -) - -// Fake allows for testing of anything consuming a MasterElector. -type Fake struct { - mux *watch.Mux - currentMaster Master - lock sync.Mutex // Protect access of currentMaster -} - -// NewFake makes a new fake MasterElector. -func NewFake() *Fake { - // 0 means block for clients. - return &Fake{mux: watch.NewMux(0)} -} - -func (f *Fake) ChangeMaster(newMaster Master) { - f.lock.Lock() - defer f.lock.Unlock() - f.mux.Action(watch.Modified, newMaster) - f.currentMaster = newMaster -} - -func (f *Fake) Elect(path, id string) watch.Interface { - f.lock.Lock() - defer f.lock.Unlock() - w := f.mux.Watch() - if f.currentMaster != "" { - f.mux.Action(watch.Modified, f.currentMaster) - } - return w -} diff --git a/pkg/election/master.go b/pkg/election/master.go deleted file mode 100644 index 7cb59ffce2b..00000000000 --- a/pkg/election/master.go +++ /dev/null @@ -1,110 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package election - -import ( - "sync" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" - - "github.com/golang/glog" -) - -// MasterElector is an interface for services that can elect masters. -// Important Note: MasterElectors are not inter-operable, all participants in the election need to be -// using the same underlying implementation of this interface for correct behavior. -type MasterElector interface { - // RequestMaster makes the caller represented by 'id' enter into a master election for the - // distributed lock defined by 'path' - // The returned watch.Interface provides a stream of Master objects which - // contain the current master. - // Calling Stop on the returned interface relinquishes ownership (if currently possesed) - // and removes the caller from the election - Elect(path, id string) watch.Interface -} - -// Service represents anything that can start and stop on demand. -type Service interface { - Start() - Stop() -} - -type notifier struct { - lock sync.Mutex - cond *sync.Cond - - // desired is updated with every change, current is updated after - // Start()/Stop() finishes. 'cond' is used to signal that a change - // might be needed. This handles the case where mastership flops - // around without calling Start()/Stop() excessively. - desired, current Master - - // for comparison, to see if we are master. - id Master - - service Service -} - -// Notify runs Elect() on m, and calls Start()/Stop() on s when the -// elected master starts/stops matching 'id'. Never returns. -func Notify(m MasterElector, path, id string, s Service) { - n := ¬ifier{id: Master(id), service: s} - n.cond = sync.NewCond(&n.lock) - go n.serviceLoop() - for { - w := m.Elect(path, id) - for { - event, open := <-w.ResultChan() - if !open { - break - } - if event.Type != watch.Modified { - continue - } - electedMaster, ok := event.Object.(Master) - if !ok { - glog.Errorf("Unexpected object from election channel: %v", event.Object) - break - } - func() { - n.lock.Lock() - defer n.lock.Unlock() - n.desired = electedMaster - if n.desired != n.current { - n.cond.Signal() - } - }() - } - } -} - -// serviceLoop waits for changes, and calls Start()/Stop() as needed. -func (n *notifier) serviceLoop() { - n.lock.Lock() - defer n.lock.Unlock() - for { - for n.desired == n.current { - n.cond.Wait() - } - if n.current != n.id && n.desired == n.id { - n.service.Start() - } else if n.current == n.id && n.desired != n.id { - n.service.Stop() - } - n.current = n.desired - } -} diff --git a/pkg/election/master_test.go b/pkg/election/master_test.go deleted file mode 100644 index 478602b5cec..00000000000 --- a/pkg/election/master_test.go +++ /dev/null @@ -1,82 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package election - -import ( - "testing" - "time" -) - -type slowService struct { - t *testing.T - on bool - // We explicitly have no lock to prove that - // Start and Stop are not called concurrently. - changes chan<- bool -} - -func (s *slowService) Start() { - if s.on { - s.t.Errorf("started already on service") - } - time.Sleep(2 * time.Millisecond) - s.on = true - s.changes <- true -} - -func (s *slowService) Stop() { - if !s.on { - s.t.Errorf("stopped already off service") - } - time.Sleep(2 * time.Millisecond) - s.on = false - s.changes <- false -} - -func Test(t *testing.T) { - m := NewFake() - changes := make(chan bool, 1500) - s := &slowService{t: t, changes: changes} - go Notify(m, "", "me", s) - - done := make(chan struct{}) - go func() { - for i := 0; i < 500; i++ { - for _, key := range []string{"me", "notme", "alsonotme"} { - m.ChangeMaster(Master(key)) - } - } - close(done) - }() - - <-done - time.Sleep(8 * time.Millisecond) - close(changes) - - changeList := []bool{} - for { - change, ok := <-changes - if !ok { - break - } - changeList = append(changeList, change) - } - - if len(changeList) > 1000 { - t.Errorf("unexpected number of changes: %v", len(changeList)) - } -}