From d05dad6c59ebb15675adee0c9b58692da5485730 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 27 Oct 2014 17:51:48 -0700 Subject: [PATCH 1/9] Add runner utility. --- pkg/util/runner.go | 58 +++++++++++++++++++++++++++++++++++++++++ pkg/util/runner_test.go | 55 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 113 insertions(+) create mode 100644 pkg/util/runner.go create mode 100644 pkg/util/runner_test.go diff --git a/pkg/util/runner.go b/pkg/util/runner.go new file mode 100644 index 00000000000..85cd7e21975 --- /dev/null +++ b/pkg/util/runner.go @@ -0,0 +1,58 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "sync" +) + +// Runner is an abstraction to make it easy to start and stop groups of things that can be +// described by a single function which waits on a channel close to exit. +type Runner struct { + lock sync.Mutex + loopFuncs []func(stop chan struct{}) + stop *chan struct{} +} + +// NewRunner makes a runner for the given function(s). The function(s) should loop until +// the channel is closed. +func NewRunner(f ...func(stop chan struct{})) *Runner { + return &Runner{loopFuncs: f} +} + +// Start begins running. +func (r *Runner) Start() { + r.lock.Lock() + defer r.lock.Unlock() + if r.stop == nil { + c := make(chan struct{}) + r.stop = &c + for i := range r.loopFuncs { + go r.loopFuncs[i](*r.stop) + } + } +} + +// Stop stops running. +func (r *Runner) Stop() { + r.lock.Lock() + defer r.lock.Unlock() + if r.stop != nil { + close(*r.stop) + r.stop = nil + } +} diff --git a/pkg/util/runner_test.go b/pkg/util/runner_test.go new file mode 100644 index 00000000000..0639edc52d2 --- /dev/null +++ b/pkg/util/runner_test.go @@ -0,0 +1,55 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "fmt" + "sync" + "testing" +) + +func TestRunner(t *testing.T) { + var ( + lock sync.Mutex + events []string + funcs []func(chan struct{}) + ) + done := make(chan struct{}, 20) + for i := 0; i < 10; i++ { + iCopy := i + funcs = append(funcs, func(c chan struct{}) { + lock.Lock() + events = append(events, fmt.Sprintf("%v starting\n", iCopy)) + lock.Unlock() + <-c + lock.Lock() + events = append(events, fmt.Sprintf("%v stopping\n", iCopy)) + lock.Unlock() + done <- struct{}{} + }) + } + + r := NewRunner(funcs...) + r.Start() + r.Stop() + for i := 0; i < 10; i++ { + <-done + } + if len(events) != 20 { + t.Errorf("expected 20 events, but got:\n%v\n", events) + } +} From 43112732949a5a8edb410e2b542cd723d0c64f32 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 27 Oct 2014 17:52:15 -0700 Subject: [PATCH 2/9] Expand choices for using election code. --- pkg/election/fake.go | 53 ++++++++++++++++++++++++ pkg/election/master.go | 76 ++++++++++++++++++++++++++++++++++ pkg/election/master_test.go | 82 +++++++++++++++++++++++++++++++++++++ 3 files changed, 211 insertions(+) create mode 100644 pkg/election/fake.go create mode 100644 pkg/election/master_test.go diff --git a/pkg/election/fake.go b/pkg/election/fake.go new file mode 100644 index 00000000000..27852b62bbf --- /dev/null +++ b/pkg/election/fake.go @@ -0,0 +1,53 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package election + +import ( + "sync" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +// Fake allows for testing of anything consuming a MasterElector. +type Fake struct { + mux *watch.Mux + currentMaster Master + lock sync.Mutex // Protect access of currentMaster +} + +// NewFake makes a new fake MasterElector. +func NewFake() *Fake { + // 0 means block for clients. + return &Fake{mux: watch.NewMux(0)} +} + +func (f *Fake) ChangeMaster(newMaster Master) { + f.lock.Lock() + defer f.lock.Unlock() + f.mux.Action(watch.Modified, newMaster) + f.currentMaster = newMaster +} + +func (f *Fake) Elect(path, id string) watch.Interface { + f.lock.Lock() + defer f.lock.Unlock() + w := f.mux.Watch() + if f.currentMaster != "" { + f.mux.Action(watch.Modified, f.currentMaster) + } + return w +} diff --git a/pkg/election/master.go b/pkg/election/master.go index 14b16da5564..7cb59ffce2b 100644 --- a/pkg/election/master.go +++ b/pkg/election/master.go @@ -17,7 +17,11 @@ limitations under the License. package election import ( + "sync" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + + "github.com/golang/glog" ) // MasterElector is an interface for services that can elect masters. @@ -32,3 +36,75 @@ type MasterElector interface { // and removes the caller from the election Elect(path, id string) watch.Interface } + +// Service represents anything that can start and stop on demand. +type Service interface { + Start() + Stop() +} + +type notifier struct { + lock sync.Mutex + cond *sync.Cond + + // desired is updated with every change, current is updated after + // Start()/Stop() finishes. 'cond' is used to signal that a change + // might be needed. This handles the case where mastership flops + // around without calling Start()/Stop() excessively. + desired, current Master + + // for comparison, to see if we are master. + id Master + + service Service +} + +// Notify runs Elect() on m, and calls Start()/Stop() on s when the +// elected master starts/stops matching 'id'. Never returns. +func Notify(m MasterElector, path, id string, s Service) { + n := ¬ifier{id: Master(id), service: s} + n.cond = sync.NewCond(&n.lock) + go n.serviceLoop() + for { + w := m.Elect(path, id) + for { + event, open := <-w.ResultChan() + if !open { + break + } + if event.Type != watch.Modified { + continue + } + electedMaster, ok := event.Object.(Master) + if !ok { + glog.Errorf("Unexpected object from election channel: %v", event.Object) + break + } + func() { + n.lock.Lock() + defer n.lock.Unlock() + n.desired = electedMaster + if n.desired != n.current { + n.cond.Signal() + } + }() + } + } +} + +// serviceLoop waits for changes, and calls Start()/Stop() as needed. +func (n *notifier) serviceLoop() { + n.lock.Lock() + defer n.lock.Unlock() + for { + for n.desired == n.current { + n.cond.Wait() + } + if n.current != n.id && n.desired == n.id { + n.service.Start() + } else if n.current == n.id && n.desired != n.id { + n.service.Stop() + } + n.current = n.desired + } +} diff --git a/pkg/election/master_test.go b/pkg/election/master_test.go new file mode 100644 index 00000000000..478602b5cec --- /dev/null +++ b/pkg/election/master_test.go @@ -0,0 +1,82 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package election + +import ( + "testing" + "time" +) + +type slowService struct { + t *testing.T + on bool + // We explicitly have no lock to prove that + // Start and Stop are not called concurrently. + changes chan<- bool +} + +func (s *slowService) Start() { + if s.on { + s.t.Errorf("started already on service") + } + time.Sleep(2 * time.Millisecond) + s.on = true + s.changes <- true +} + +func (s *slowService) Stop() { + if !s.on { + s.t.Errorf("stopped already off service") + } + time.Sleep(2 * time.Millisecond) + s.on = false + s.changes <- false +} + +func Test(t *testing.T) { + m := NewFake() + changes := make(chan bool, 1500) + s := &slowService{t: t, changes: changes} + go Notify(m, "", "me", s) + + done := make(chan struct{}) + go func() { + for i := 0; i < 500; i++ { + for _, key := range []string{"me", "notme", "alsonotme"} { + m.ChangeMaster(Master(key)) + } + } + close(done) + }() + + <-done + time.Sleep(8 * time.Millisecond) + close(changes) + + changeList := []bool{} + for { + change, ok := <-changes + if !ok { + break + } + changeList = append(changeList, change) + } + + if len(changeList) > 1000 { + t.Errorf("unexpected number of changes: %v", len(changeList)) + } +} From 7209ca154361f8debcfda62299fbb05d7c6bfb83 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 27 Oct 2014 17:54:33 -0700 Subject: [PATCH 3/9] Make redirect handle namespaces just like proxy (which is weird and needs to be fixed but at least this will be consistent). --- pkg/apiserver/redirect.go | 6 +++++- pkg/apiserver/redirect_test.go | 1 + 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/apiserver/redirect.go b/pkg/apiserver/redirect.go index f8b499d4e73..a9832291f24 100644 --- a/pkg/apiserver/redirect.go +++ b/pkg/apiserver/redirect.go @@ -30,7 +30,11 @@ type RedirectHandler struct { } func (r *RedirectHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { - ctx := api.NewContext() + ctx := api.NewDefaultContext() + namespace := req.URL.Query().Get("namespace") + if len(namespace) > 0 { + ctx = api.WithNamespace(ctx, namespace) + } parts := splitPath(req.URL.Path) if len(parts) != 2 || req.Method != "GET" { notFound(w, req) diff --git a/pkg/apiserver/redirect_test.go b/pkg/apiserver/redirect_test.go index 072c388858c..a17a286a16f 100644 --- a/pkg/apiserver/redirect_test.go +++ b/pkg/apiserver/redirect_test.go @@ -27,6 +27,7 @@ import ( func TestRedirect(t *testing.T) { simpleStorage := &SimpleRESTStorage{ errors: map[string]error{}, + expectedResourceNamespace: "default", } handler := Handle(map[string]RESTStorage{ "foo": simpleStorage, From 3045311398c6abe6045d17f88015cd82b14bd25e Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 27 Oct 2014 17:55:12 -0700 Subject: [PATCH 4/9] Fix subtle bug when proxy constructs outgoing URL. --- pkg/apiserver/proxy.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/pkg/apiserver/proxy.go b/pkg/apiserver/proxy.go index 879a88819fb..af734f7f7f7 100644 --- a/pkg/apiserver/proxy.go +++ b/pkg/apiserver/proxy.go @@ -113,10 +113,16 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { location, err := redirector.ResourceLocation(ctx, id) if err != nil { + httplog.LogOf(req, w).Addf("Error getting ResourceLocation: %v", err) status := errToAPIStatus(err) writeJSON(status.Code, r.codec, status, w) return } + if location == "" { + httplog.LogOf(req, w).Addf("ResourceLocation for %v returned ''", id) + notFound(w, req) + return + } destURL, err := url.Parse(location) if err != nil { @@ -124,11 +130,19 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { writeJSON(status.Code, r.codec, status, w) return } + if destURL.Scheme == "" { + // If no scheme was present in location, url.Parse sometimes mistakes + // hosts for paths. + destURL.Host = location + } destURL.Path = rest destURL.RawQuery = req.URL.RawQuery newReq, err := http.NewRequest(req.Method, destURL.String(), req.Body) if err != nil { - glog.Errorf("Failed to create request: %s", err) + status := errToAPIStatus(err) + writeJSON(status.Code, r.codec, status, w) + notFound(w, req) + return } newReq.Header = req.Header From 7146ec9d49c603d365c0186da3863d7c023d343c Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 27 Oct 2014 17:56:33 -0700 Subject: [PATCH 5/9] Implement kubernetes & kubernetes-ro services --- cmd/apiserver/apiserver.go | 39 ++++++++-- pkg/master/master.go | 77 +++++++++++++++++++ pkg/master/publish.go | 113 ++++++++++++++++++++++++++++ pkg/registry/service/rest.go | 3 + pkg/service/endpoints_controller.go | 6 ++ 5 files changed, 232 insertions(+), 6 deletions(-) create mode 100644 pkg/master/publish.go diff --git a/cmd/apiserver/apiserver.go b/cmd/apiserver/apiserver.go index 9017630dda6..7961316c68a 100644 --- a/cmd/apiserver/apiserver.go +++ b/cmd/apiserver/apiserver.go @@ -42,9 +42,23 @@ import ( ) var ( - port = flag.Uint("port", 8080, "The port to listen on. Default 8080") + // Note: the weird ""+ in below lines seems to be the only way to get gofmt to + // arrange these text blocks sensibly. Grrr. + port = flag.Uint("port", 8080, ""+ + "The port to listen on. Default 8080. It is assumed that firewall rules are "+ + "set up such that this port is not reachable from outside of the cluster. It is "+ + "further assumed that port 443 on the cluster's public address is proxied to this "+ + "port. This is performed by nginx in the default setup.") address = util.IP(net.ParseIP("127.0.0.1")) - readOnlyPort = flag.Uint("read_only_port", 7080, "The port from which to serve read-only resources. If 0, don't serve on a read-only address.") + publicAddressOverride = flag.String("public_address_override", "", ""+ + "Public serving address. Read only port will be opened on this address, "+ + "and it is assumed that port 443 at this address will be proxied/redirected "+ + "to '-address':'-port'. If blank, the address in the first listed interface "+ + "will be used.") + readOnlyPort = flag.Uint("read_only_port", 7080, ""+ + "The port from which to serve read-only resources. If 0, don't serve on a "+ + "read-only address. It is assumed that firewall rules are set up such that "+ + "this port is not reachable from outside of the cluster.") apiPrefix = flag.String("api_prefix", "/api", "The prefix for API requests on the server. Default '/api'.") storageVersion = flag.String("storage_version", "", "The version to store resources with. Defaults to server preferred") cloudProvider = flag.String("cloud_provider", "", "The provider for cloud services. Empty string for no provider.") @@ -184,7 +198,7 @@ func main() { n := net.IPNet(portalNet) mux := http.NewServeMux() - m := master.New(&master.Config{ + config := &master.Config{ Client: client, Cloud: cloud, EtcdHelper: helper, @@ -207,13 +221,26 @@ func main() { APIPrefix: *apiPrefix, CorsAllowedOriginList: corsAllowedOriginList, TokenAuthFile: *tokenAuthFile, - }) + ReadOnlyPort: int(*readOnlyPort), + ReadWritePort: int(*port), + PublicAddress: *publicAddressOverride, + } + m := master.New(config) + + roLocation := "" if *readOnlyPort != 0 { + roLocation = net.JoinHostPort(config.PublicAddress, strconv.Itoa(config.ReadOnlyPort)) + } + rwLocation := net.JoinHostPort(address.String(), strconv.Itoa(int(*port))) + + // See the flag commentary to understand our assumptions when opening the read-only and read-write ports. + + if roLocation != "" { // Allow 1 read-only request per second, allow up to 20 in a burst before enforcing. rl := util.NewTokenBucketRateLimiter(1.0, 20) readOnlyServer := &http.Server{ - Addr: net.JoinHostPort(address.String(), strconv.Itoa(int(*readOnlyPort))), + Addr: roLocation, Handler: apiserver.RecoverPanics(apiserver.ReadOnly(apiserver.RateLimit(rl, m.Handler))), ReadTimeout: 5 * time.Minute, WriteTimeout: 5 * time.Minute, @@ -226,7 +253,7 @@ func main() { } s := &http.Server{ - Addr: net.JoinHostPort(address.String(), strconv.Itoa(int(*port))), + Addr: rwLocation, Handler: apiserver.RecoverPanics(m.Handler), ReadTimeout: 5 * time.Minute, WriteTimeout: 5 * time.Minute, diff --git a/pkg/master/master.go b/pkg/master/master.go index b14b3e8851a..09b9420ebb1 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -19,7 +19,9 @@ package master import ( "net" "net/http" + "strconv" "strings" + "sync" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -34,6 +36,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" cloudcontroller "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller" + "github.com/GoogleCloudPlatform/kubernetes/pkg/election" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/binding" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint" @@ -70,6 +73,16 @@ type Config struct { APIPrefix string CorsAllowedOriginList util.StringList TokenAuthFile string + + // The port on PublicAddress where a read-only server will be installed. + // Defaults to 7080 if not set. + ReadOnlyPort int + // The port on PublicAddress where a read-write server will be installed. + // Defaults to 443 if not set. + ReadWritePort int + + // If empty, the first result from net.InterfaceAddrs will be used. + PublicAddress string } // Master contains state for a Kubernetes cluster master/api server. @@ -91,8 +104,18 @@ type Master struct { apiPrefix string corsAllowedOriginList util.StringList tokenAuthFile string + // "Outputs" Handler http.Handler + + elector election.MasterElector + readOnlyServer string + readWriteServer string + electedMasterServices *util.Runner + + // lock must be held when accessing the below read-write members. + lock sync.RWMutex + electedMaster election.Master } // NewEtcdHelper returns an EtcdHelper for the provided arguments or an error if the version @@ -108,8 +131,44 @@ func NewEtcdHelper(client tools.EtcdGetSet, version string) (helper tools.EtcdHe return tools.EtcdHelper{client, versionInterfaces.Codec, tools.RuntimeVersionAdapter{versionInterfaces.MetadataAccessor}}, nil } +// setDefaults fills in any fields not set that are required to have valid data. +func setDefaults(c *Config) { + if c.ReadOnlyPort == 0 { + c.ReadOnlyPort = 7080 + } + if c.ReadWritePort == 0 { + c.ReadWritePort = 443 + } + if c.PublicAddress == "" { + addrs, err := net.InterfaceAddrs() + if err != nil { + glog.Fatalf("Unable to get network interfaces: error='%v'", err) + } + found := false + for i := range addrs { + ip, _, err := net.ParseCIDR(addrs[i].String()) + if err != nil { + glog.Errorf("Error parsing '%v': %v", addrs[i], err) + continue + } + if ip.IsLoopback() { + glog.Infof("'%v' (%v) is a loopback address, ignoring.", ip, addrs[i]) + continue + } + found = true + c.PublicAddress = ip.String() + glog.Infof("Will report %v as public IP address.", ip) + break + } + if !found { + glog.Fatalf("Unable to find suitible network address in list: %v", addrs) + } + } +} + // New returns a new instance of Master connected to the given etcd server. func New(c *Config) *Master { + setDefaults(c) minionRegistry := makeMinionRegistry(c) serviceRegistry := etcd.NewRegistry(c.EtcdHelper, nil) boundPodFactory := &pod.BasicBoundPodFactory{ @@ -131,7 +190,11 @@ func New(c *Config) *Master { apiPrefix: c.APIPrefix, corsAllowedOriginList: c.CorsAllowedOriginList, tokenAuthFile: c.TokenAuthFile, + elector: election.NewEtcdMasterElector(c.EtcdHelper.Client), + readOnlyServer: net.JoinHostPort(c.PublicAddress, strconv.Itoa(int(c.ReadOnlyPort))), + readWriteServer: net.JoinHostPort(c.PublicAddress, strconv.Itoa(int(c.ReadWritePort))), } + m.electedMasterServices = util.NewRunner(m.serviceWriterLoop, m.electionAnnounce) m.init(c) return m } @@ -188,6 +251,7 @@ func (m *Master) init(c *Config) { // TODO: should appear only in scheduler API group. "bindings": binding.NewREST(m.bindingRegistry), } + apiserver.NewAPIGroup(m.API_v1beta1()).InstallREST(m.mux, c.APIPrefix+"/v1beta1") apiserver.NewAPIGroup(m.API_v1beta2()).InstallREST(m.mux, c.APIPrefix+"/v1beta2") versionHandler := apiserver.APIVersionHandler("v1beta1", "v1beta2") @@ -216,6 +280,19 @@ func (m *Master) init(c *Config) { m.mux.HandleFunc("/_whoami", handleWhoAmI(authenticator)) m.Handler = handler + + if m.readWriteServer != "" { + glog.Infof("Starting election services as %v", m.readWriteServer) + go election.Notify(m.elector, "/registry/elections/k8smaster", m.readWriteServer, m.electedMasterServices) + } + + // TODO: start a goroutine to report ourselves to the elected master. +} + +func (m *Master) electionAnnounce(stop chan struct{}) { + glog.Infof("Elected as master") + <-stop + glog.Info("Lost election for master") } // API_v1beta1 returns the resources and codec for API version v1beta1. diff --git a/pkg/master/publish.go b/pkg/master/publish.go new file mode 100644 index 00000000000..6fc19d26cb3 --- /dev/null +++ b/pkg/master/publish.go @@ -0,0 +1,113 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package master + +import ( + "fmt" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + + "github.com/golang/glog" +) + +func (m *Master) serviceWriterLoop(stop chan struct{}) { + for { + // Update service & endpoint records. + // TODO: when it becomes possible to change this stuff, + // stop polling and start watching. + // TODO: add endpoints of all replicas, not just the elected master. + if m.readWriteServer != "" { + if err := m.createMasterServiceIfNeeded("kubernetes", 443); err != nil { + glog.Errorf("Can't create rw service: %v", err) + } + if err := m.setEndpoints("kubernetes", []string{m.readWriteServer}); err != nil { + glog.Errorf("Can't create rw endpoints: %v", err) + } + } else { + m.deleteMasterService("kubernetes") + } + if m.readOnlyServer != "" { + if err := m.createMasterServiceIfNeeded("kubernetes-ro", 80); err != nil { + glog.Errorf("Can't create ro service: %v", err) + } + if err := m.setEndpoints("kubernetes-ro", []string{m.readOnlyServer}); err != nil { + glog.Errorf("Can't create rw endpoints: %v", err) + } + } else { + m.deleteMasterService("kubernetes-ro") + } + + select { + case <-stop: + return + case <-time.After(10 * time.Second): + } + } +} + +// createMasterServiceIfNeeded will create the specified service if it +// doesn't already exist. +func (m *Master) createMasterServiceIfNeeded(serviceName string, port int) error { + ctx := api.NewDefaultContext() + if _, err := m.serviceRegistry.GetService(ctx, serviceName); err == nil { + // The service already exists. + return nil + } + svc := &api.Service{ + ObjectMeta: api.ObjectMeta{ + Name: serviceName, + Namespace: "default", + }, + Port: port, + // We're going to add the endpoints by hand, so this selector is mainly to + // prevent identification of other pods. This selector will be useful when + // we start hosting apiserver in a pod. + Selector: map[string]string{"provider": "kubernetes", "component": "apiserver"}, + } + // Kids, don't do this at home: this is a hack. There's no good way to call the business + // logic which lives in the REST object from here. + c, err := m.storage["services"].Create(ctx, svc) + if err != nil { + return err + } + resp := <-c + if _, ok := resp.(*api.Service); ok { + // If all worked, we get back an *api.Service object. + return nil + } + return fmt.Errorf("Unexpected response: %#v", resp) +} + +func (m *Master) deleteMasterService(serviceName string) { + ctx := api.NewDefaultContext() + m.serviceRegistry.DeleteService(ctx, serviceName) +} + +// setEndpoints sets the endpoints for the given service. +func (m *Master) setEndpoints(serviceName string, endpoints []string) error { + ctx := api.NewDefaultContext() + e, err := m.endpointRegistry.GetEndpoints(ctx, serviceName) + if err != nil { + e = &api.Endpoints{} + // Fill in ID if it didn't exist already + e.ObjectMeta.Name = serviceName + e.ObjectMeta.Namespace = "default" + } + e.Endpoints = endpoints + return m.endpointRegistry.UpdateEndpoints(ctx, e) +} diff --git a/pkg/registry/service/rest.go b/pkg/registry/service/rest.go index 6aefe50839d..8bd1652caf6 100644 --- a/pkg/registry/service/rest.go +++ b/pkg/registry/service/rest.go @@ -48,6 +48,9 @@ type REST struct { func NewREST(registry Registry, cloud cloudprovider.Interface, machines minion.Registry, portalNet *net.IPNet) *REST { // TODO: Before we can replicate masters, this has to be synced (e.g. lives in etcd) ipa := newIPAllocator(portalNet) + if ipa == nil { + glog.Fatalf("Failed to create an IP allocator. Is subnet '%v' valid?", portalNet) + } reloadIPsFromStorage(ipa, registry) return &REST{ diff --git a/pkg/service/endpoints_controller.go b/pkg/service/endpoints_controller.go index 75fb80dff8e..f76cec32aa3 100644 --- a/pkg/service/endpoints_controller.go +++ b/pkg/service/endpoints_controller.go @@ -51,6 +51,12 @@ func (e *EndpointController) SyncServiceEndpoints() error { } var resultErr error for _, service := range services.Items { + if service.Name == "kubernetes" || service.Name == "kubernetes-ro" { + // This is a temporary hack for supporting the master services + // until we actually start running apiserver in a pod. + continue + } + glog.Infof("About to update endpoints for service %v", service.Name) pods, err := e.client.Pods(service.Namespace).List(labels.Set(service.Selector).AsSelector()) if err != nil { glog.Errorf("Error syncing service: %#v, skipping.", service) From 35bd8d4a11c879e045447742fba12c6ca9155854 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 27 Oct 2014 17:57:28 -0700 Subject: [PATCH 6/9] Add e2e and integration tests. --- cmd/e2e/e2e.go | 35 ++++++++++++++- cmd/integration/integration.go | 79 +++++++++++++++++++++++++++++++++ test/integration/client_test.go | 9 ++++ 3 files changed, 122 insertions(+), 1 deletion(-) diff --git a/cmd/e2e/e2e.go b/cmd/e2e/e2e.go index d6dbae931fb..a7583ba8436 100644 --- a/cmd/e2e/e2e.go +++ b/cmd/e2e/e2e.go @@ -98,6 +98,38 @@ func loadClientOrDie() *client.Client { return c } +func TestKubernetesROService(c *client.Client) bool { + svc := api.ServiceList{} + err := c.Get(). + Namespace("default"). + AbsPath("/api/v1beta1/proxy/services/kubernetes-ro/api/v1beta1/services"). + Do(). + Into(&svc) + if err != nil { + glog.Errorf("unexpected error listing services using ro service: %v", err) + return false + } + var foundRW, foundRO bool + for i := range svc.Items { + if svc.Items[i].Name == "kubernetes" { + foundRW = true + } + if svc.Items[i].Name == "kubernetes-ro" { + foundRO = true + } + } + if !foundRW { + glog.Error("no RW service found") + } + if !foundRO { + glog.Error("no RO service found") + } + if !foundRW || !foundRO { + return false + } + return true +} + func TestPodUpdate(c *client.Client) bool { podClient := c.Pods(api.NamespaceDefault) @@ -158,7 +190,8 @@ func main() { c := loadClientOrDie() tests := []func(c *client.Client) bool{ - // TODO(brendandburns): fix this test and re-add it: TestPodUpdate, + TestKubernetesROService, + // TODO(brendandburns): fix this test and re-add it: TestPodUpdate, } passed := true diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index a29eb8c6a71..dcb787a7b08 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -27,6 +27,8 @@ import ( "os" "reflect" "runtime" + "strconv" + "strings" "sync" "time" @@ -131,6 +133,16 @@ func startComponents(manifestURL string) (apiServerURL string) { if err != nil { glog.Fatalf("Unable to parse CIDR: %v", err) } + glog.Infof("Using portalNet '%v'", portalNet) + + host, port, err := net.SplitHostPort(strings.TrimLeft(apiServer.URL, "http://")) + if err != nil { + glog.Fatalf("Unable to parse URL '%v': %v", apiServer.URL, err) + } + portNumber, err := strconv.Atoi(port) + if err != nil { + glog.Fatalf("Nonnumeric port? %v", err) + } mux := http.NewServeMux() // Create a master and install handlers into mux. master.New(&master.Config{ @@ -142,6 +154,10 @@ func startComponents(manifestURL string) (apiServerURL string) { Mux: mux, EnableLogsSupport: false, APIPrefix: "/api", + + ReadWritePort: portNumber, + ReadOnlyPort: portNumber, + PublicAddress: host, }) handler.delegate = mux @@ -342,6 +358,68 @@ func runAtomicPutTest(c *client.Client) { glog.Info("Atomic PUTs work.") } +func runMasterServiceTest(client *client.Client) { + time.Sleep(12 * time.Second) + var svcList api.ServiceList + err := client.Get(). + Namespace("default"). + Path("services"). + Do(). + Into(&svcList) + if err != nil { + glog.Fatalf("unexpected error listing services: %v", err) + } + var foundRW, foundRO bool + found := util.StringSet{} + for i := range svcList.Items { + found.Insert(svcList.Items[i].Name) + if svcList.Items[i].Name == "kubernetes" { + foundRW = true + } + if svcList.Items[i].Name == "kubernetes-ro" { + foundRO = true + } + } + if foundRW { + var ep api.Endpoints + err := client.Get(). + Namespace("default"). + Path("endpoints"). + Path("kubernetes"). + Do(). + Into(&ep) + if err != nil { + glog.Fatalf("unexpected error listing endpoints for kubernetes service: %v", err) + } + if len(ep.Endpoints) == 0 { + glog.Fatalf("no endpoints for kubernetes service: %v", ep) + } + } else { + glog.Errorf("no RW service found: %v", found) + } + if foundRO { + var ep api.Endpoints + err := client.Get(). + Namespace("default"). + Path("endpoints"). + Path("kubernetes-ro"). + Do(). + Into(&ep) + if err != nil { + glog.Fatalf("unexpected error listing endpoints for kubernetes service: %v", err) + } + if len(ep.Endpoints) == 0 { + glog.Fatalf("no endpoints for kubernetes service: %v", ep) + } + } else { + glog.Errorf("no RO service found: %v", found) + } + if !foundRW || !foundRO { + glog.Fatalf("Kubernetes service test failed: %v", found) + } + glog.Infof("Master service test passed.") +} + func runServiceTest(client *client.Client) { pod := api.Pod{ ObjectMeta: api.ObjectMeta{ @@ -438,6 +516,7 @@ func main() { runAtomicPutTest, runServiceTest, runAPIVersionsTest, + runMasterServiceTest, } var wg sync.WaitGroup wg.Add(len(testFuncs)) diff --git a/test/integration/client_test.go b/test/integration/client_test.go index a8de250deca..9cac0f3d5a6 100644 --- a/test/integration/client_test.go +++ b/test/integration/client_test.go @@ -19,6 +19,7 @@ limitations under the License. package integration import ( + "net" "net/http" "net/http/httptest" "reflect" @@ -29,6 +30,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/master" "github.com/GoogleCloudPlatform/kubernetes/pkg/version" + + "github.com/golang/glog" ) func init() { @@ -41,8 +44,14 @@ func TestClient(t *testing.T) { t.Fatalf("unexpected error: %v", err) } mux := http.NewServeMux() + + _, portalNet, err := net.ParseCIDR("10.0.0.0/24") + if err != nil { + glog.Fatalf("Unable to parse CIDR: %v", err) + } master.New(&master.Config{ EtcdHelper: helper, + PortalNet: portalNet, Mux: mux, EnableLogsSupport: false, EnableUISupport: false, From a036ebc1be5619b395c67d2088bffb98a5bfc8d8 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Tue, 28 Oct 2014 16:49:52 -0700 Subject: [PATCH 7/9] Switch models. No master election. --- cmd/apiserver/apiserver.go | 8 +++--- pkg/master/master.go | 43 +++++++++++++--------------- pkg/master/publish.go | 57 +++++++++++++++++++++++++++----------- 3 files changed, 64 insertions(+), 44 deletions(-) diff --git a/cmd/apiserver/apiserver.go b/cmd/apiserver/apiserver.go index 7961316c68a..ac06f668370 100644 --- a/cmd/apiserver/apiserver.go +++ b/cmd/apiserver/apiserver.go @@ -44,7 +44,7 @@ import ( var ( // Note: the weird ""+ in below lines seems to be the only way to get gofmt to // arrange these text blocks sensibly. Grrr. - port = flag.Uint("port", 8080, ""+ + port = flag.Int("port", 8080, ""+ "The port to listen on. Default 8080. It is assumed that firewall rules are "+ "set up such that this port is not reachable from outside of the cluster. It is "+ "further assumed that port 443 on the cluster's public address is proxied to this "+ @@ -55,7 +55,7 @@ var ( "and it is assumed that port 443 at this address will be proxied/redirected "+ "to '-address':'-port'. If blank, the address in the first listed interface "+ "will be used.") - readOnlyPort = flag.Uint("read_only_port", 7080, ""+ + readOnlyPort = flag.Int("read_only_port", 7080, ""+ "The port from which to serve read-only resources. If 0, don't serve on a "+ "read-only address. It is assumed that firewall rules are set up such that "+ "this port is not reachable from outside of the cluster.") @@ -222,8 +222,8 @@ func main() { CorsAllowedOriginList: corsAllowedOriginList, TokenAuthFile: *tokenAuthFile, - ReadOnlyPort: int(*readOnlyPort), - ReadWritePort: int(*port), + ReadOnlyPort: *readOnlyPort, + ReadWritePort: *port, PublicAddress: *publicAddressOverride, } m := master.New(config) diff --git a/pkg/master/master.go b/pkg/master/master.go index 09b9420ebb1..a014d41eaf6 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -21,7 +21,6 @@ import ( "net/http" "strconv" "strings" - "sync" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -36,7 +35,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" cloudcontroller "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller" - "github.com/GoogleCloudPlatform/kubernetes/pkg/election" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/binding" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint" @@ -74,6 +72,10 @@ type Config struct { CorsAllowedOriginList util.StringList TokenAuthFile string + // Number of masters running; all masters must be started with the + // same value for this field. (Numbers > 1 currently untested.) + MasterCount int + // The port on PublicAddress where a read-only server will be installed. // Defaults to 7080 if not set. ReadOnlyPort int @@ -104,18 +106,14 @@ type Master struct { apiPrefix string corsAllowedOriginList util.StringList tokenAuthFile string + masterCount int // "Outputs" Handler http.Handler - elector election.MasterElector - readOnlyServer string - readWriteServer string - electedMasterServices *util.Runner - - // lock must be held when accessing the below read-write members. - lock sync.RWMutex - electedMaster election.Master + readOnlyServer string + readWriteServer string + masterServices *util.Runner } // NewEtcdHelper returns an EtcdHelper for the provided arguments or an error if the version @@ -133,6 +131,10 @@ func NewEtcdHelper(client tools.EtcdGetSet, version string) (helper tools.EtcdHe // setDefaults fills in any fields not set that are required to have valid data. func setDefaults(c *Config) { + if c.MasterCount == 0 { + // Clearly, there will be at least one master. + c.MasterCount = 1 + } if c.ReadOnlyPort == 0 { c.ReadOnlyPort = 7080 } @@ -140,6 +142,9 @@ func setDefaults(c *Config) { c.ReadWritePort = 443 } if c.PublicAddress == "" { + // Find and use the first non-loopback address. + // TODO: potentially it'd be useful to skip the docker interface if it + // somehow is first in the list. addrs, err := net.InterfaceAddrs() if err != nil { glog.Fatalf("Unable to get network interfaces: error='%v'", err) @@ -190,11 +195,11 @@ func New(c *Config) *Master { apiPrefix: c.APIPrefix, corsAllowedOriginList: c.CorsAllowedOriginList, tokenAuthFile: c.TokenAuthFile, - elector: election.NewEtcdMasterElector(c.EtcdHelper.Client), + masterCount: c.MasterCount, readOnlyServer: net.JoinHostPort(c.PublicAddress, strconv.Itoa(int(c.ReadOnlyPort))), readWriteServer: net.JoinHostPort(c.PublicAddress, strconv.Itoa(int(c.ReadWritePort))), } - m.electedMasterServices = util.NewRunner(m.serviceWriterLoop, m.electionAnnounce) + m.masterServices = util.NewRunner(m.serviceWriterLoop, m.roServiceWriterLoop) m.init(c) return m } @@ -281,18 +286,8 @@ func (m *Master) init(c *Config) { m.Handler = handler - if m.readWriteServer != "" { - glog.Infof("Starting election services as %v", m.readWriteServer) - go election.Notify(m.elector, "/registry/elections/k8smaster", m.readWriteServer, m.electedMasterServices) - } - - // TODO: start a goroutine to report ourselves to the elected master. -} - -func (m *Master) electionAnnounce(stop chan struct{}) { - glog.Infof("Elected as master") - <-stop - glog.Info("Lost election for master") + // TODO: Attempt clean shutdown? + m.masterServices.Start() } // API_v1beta1 returns the resources and codec for API version v1beta1. diff --git a/pkg/master/publish.go b/pkg/master/publish.go index 6fc19d26cb3..83d3447ca06 100644 --- a/pkg/master/publish.go +++ b/pkg/master/publish.go @@ -35,21 +35,31 @@ func (m *Master) serviceWriterLoop(stop chan struct{}) { if err := m.createMasterServiceIfNeeded("kubernetes", 443); err != nil { glog.Errorf("Can't create rw service: %v", err) } - if err := m.setEndpoints("kubernetes", []string{m.readWriteServer}); err != nil { + if err := m.ensureEndpointsContain("kubernetes", m.readWriteServer); err != nil { glog.Errorf("Can't create rw endpoints: %v", err) } - } else { - m.deleteMasterService("kubernetes") } + + select { + case <-stop: + return + case <-time.After(10 * time.Second): + } + } +} + +func (m *Master) roServiceWriterLoop(stop chan struct{}) { + for { + // Update service & endpoint records. + // TODO: when it becomes possible to change this stuff, + // stop polling and start watching. if m.readOnlyServer != "" { if err := m.createMasterServiceIfNeeded("kubernetes-ro", 80); err != nil { glog.Errorf("Can't create ro service: %v", err) } - if err := m.setEndpoints("kubernetes-ro", []string{m.readOnlyServer}); err != nil { - glog.Errorf("Can't create rw endpoints: %v", err) + if err := m.ensureEndpointsContain("kubernetes-ro", m.readOnlyServer); err != nil { + glog.Errorf("Can't create ro endpoints: %v", err) } - } else { - m.deleteMasterService("kubernetes-ro") } select { @@ -86,20 +96,18 @@ func (m *Master) createMasterServiceIfNeeded(serviceName string, port int) error return err } resp := <-c - if _, ok := resp.(*api.Service); ok { + if _, ok := resp.Object.(*api.Service); ok { // If all worked, we get back an *api.Service object. return nil } return fmt.Errorf("Unexpected response: %#v", resp) } -func (m *Master) deleteMasterService(serviceName string) { - ctx := api.NewDefaultContext() - m.serviceRegistry.DeleteService(ctx, serviceName) -} - -// setEndpoints sets the endpoints for the given service. -func (m *Master) setEndpoints(serviceName string, endpoints []string) error { +// ensureEndpointsContain sets the endpoints for the given service. Also removes +// excess endpoints (as determined by m.masterCount). Extra endpoints could appear +// in the list if, for example, the master starts running on a different machine, +// changing IP addresses. +func (m *Master) ensureEndpointsContain(serviceName string, endpoint string) error { ctx := api.NewDefaultContext() e, err := m.endpointRegistry.GetEndpoints(ctx, serviceName) if err != nil { @@ -108,6 +116,23 @@ func (m *Master) setEndpoints(serviceName string, endpoints []string) error { e.ObjectMeta.Name = serviceName e.ObjectMeta.Namespace = "default" } - e.Endpoints = endpoints + found := false + for i := range e.Endpoints { + if e.Endpoints[i] == endpoint { + found = true + break + } + } + if !found { + e.Endpoints = append(e.Endpoints, endpoint) + } + if len(e.Endpoints) > m.masterCount { + // We append to the end and remove from the beginning, so this should + // converge rapidly with all masters performing this operation. + e.Endpoints = e.Endpoints[len(e.Endpoints)-m.masterCount:] + } else if found { + // We didn't make any changes, no need to actually call update. + return nil + } return m.endpointRegistry.UpdateEndpoints(ctx, e) } From 858b557bbbd48d2908495865fafd7c6ebce5041a Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Tue, 28 Oct 2014 16:50:38 -0700 Subject: [PATCH 8/9] Undemocratically remove unused election code. @brendandburns: leaving this as a separate commit so you can get it back easily when you're ready to do something with it. --- pkg/election/doc.go | 18 --- pkg/election/etcd_master.go | 185 ------------------------------- pkg/election/etcd_master_test.go | 98 ---------------- pkg/election/fake.go | 53 --------- pkg/election/master.go | 110 ------------------ pkg/election/master_test.go | 82 -------------- 6 files changed, 546 deletions(-) delete mode 100644 pkg/election/doc.go delete mode 100644 pkg/election/etcd_master.go delete mode 100644 pkg/election/etcd_master_test.go delete mode 100644 pkg/election/fake.go delete mode 100644 pkg/election/master.go delete mode 100644 pkg/election/master_test.go diff --git a/pkg/election/doc.go b/pkg/election/doc.go deleted file mode 100644 index 6982d3ec8aa..00000000000 --- a/pkg/election/doc.go +++ /dev/null @@ -1,18 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Package election provides interfaces used for master election. -package election diff --git a/pkg/election/etcd_master.go b/pkg/election/etcd_master.go deleted file mode 100644 index 35775f2a03f..00000000000 --- a/pkg/election/etcd_master.go +++ /dev/null @@ -1,185 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package election - -import ( - "fmt" - "time" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" - "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" - "github.com/coreos/go-etcd/etcd" - "github.com/golang/glog" -) - -// Master is used to announce the current elected master. -type Master string - -// IsAnAPIObject is used solely so we can work with the watch package. -// TODO: Either fix watch so this isn't necessary, or make this a real API Object. -// TODO: when it becomes clear how this package will be used, move these declarations to -// to the proper place. -func (Master) IsAnAPIObject() {} - -// NewEtcdMasterElector returns an implementation of election.MasterElector backed by etcd. -func NewEtcdMasterElector(h tools.EtcdGetSet) MasterElector { - return &etcdMasterElector{etcd: h} -} - -type empty struct{} - -// internal implementation struct -type etcdMasterElector struct { - etcd tools.EtcdGetSet - done chan empty - events chan watch.Event -} - -// Elect implements the election.MasterElector interface. -func (e *etcdMasterElector) Elect(path, id string) watch.Interface { - e.done = make(chan empty) - e.events = make(chan watch.Event) - go util.Forever(func() { e.run(path, id) }, time.Second*5) - return e -} - -func (e *etcdMasterElector) run(path, id string) { - masters := make(chan string) - errors := make(chan error) - go e.master(path, id, 30, masters, errors, e.done) - for { - select { - case m := <-masters: - e.events <- watch.Event{ - Type: watch.Modified, - Object: Master(m), - } - case e := <-errors: - glog.Errorf("error in election: %v", e) - } - } -} - -// ResultChan implements the watch.Interface interface. -func (e *etcdMasterElector) ResultChan() <-chan watch.Event { - return e.events -} - -// extendMaster attempts to extend ownership of a master lock for TTL seconds. -// returns "", nil if extension failed -// returns id, nil if extension succeeded -// returns "", err if an error occurred -func (e *etcdMasterElector) extendMaster(path, id string, ttl uint64, res *etcd.Response) (string, error) { - // If it matches the passed in id, extend the lease by writing a new entry. - // Uses compare and swap, so that if we TTL out in the meantime, the write will fail. - // We don't handle the TTL delete w/o a write case here, it's handled in the next loop - // iteration. - _, err := e.etcd.CompareAndSwap(path, id, ttl, "", res.Node.ModifiedIndex) - if err != nil && !tools.IsEtcdTestFailed(err) { - return "", err - } - if err != nil && tools.IsEtcdTestFailed(err) { - return "", nil - } - return id, nil -} - -// becomeMaster attempts to become the master for this lock. -// returns "", nil if the attempt failed -// returns id, nil if the attempt succeeded -// returns "", err if an error occurred -func (e *etcdMasterElector) becomeMaster(path, id string, ttl uint64) (string, error) { - _, err := e.etcd.Create(path, id, ttl) - if err != nil && !tools.IsEtcdNodeExist(err) { - // unexpected error - return "", err - } - if err != nil && tools.IsEtcdNodeExist(err) { - return "", nil - } - return id, nil -} - -// handleMaster performs one loop of master locking. -// on success it returns , nil -// on error it returns "", err -// in situations where you should try again due to concurrent state changes (e.g. another actor simultaneously acquiring the lock) -// it returns "", nil -func (e *etcdMasterElector) handleMaster(path, id string, ttl uint64) (string, error) { - res, err := e.etcd.Get(path, false, false) - - // Unexpected error, bail out - if err != nil && !tools.IsEtcdNotFound(err) { - return "", err - } - - // There is no master, try to become the master. - if err != nil && tools.IsEtcdNotFound(err) { - return e.becomeMaster(path, id, ttl) - } - - // This should never happen. - if res.Node == nil { - return "", fmt.Errorf("unexpected response: %#v", res) - } - - // We're not the master, just return the current value - if res.Node.Value != id { - return res.Node.Value, nil - } - - // We are the master, try to extend out lease - return e.extendMaster(path, id, ttl, res) -} - -// master provices a distributed master election lock, maintains lock until failure, or someone sends something in the done channel. -// The basic algorithm is: -// while !done -// Get the current master -// If there is no current master -// Try to become the master -// Otherwise -// If we are the master, extend the lease -// If the master is different than the last time through the loop, report the master -// Sleep 80% of TTL -func (e *etcdMasterElector) master(path, id string, ttl uint64, masters chan<- string, errors chan<- error, done <-chan empty) { - lastMaster := "" - for { - master, err := e.handleMaster(path, id, ttl) - if err != nil { - errors <- err - } else if len(master) == 0 { - continue - } else if master != lastMaster { - lastMaster = master - masters <- master - } - // TODO: Add Watch here, skip the polling for faster reactions - // If done is closed, break out. - select { - case <-done: - return - case <-time.After(time.Duration((ttl*8)/10) * time.Second): - } - } -} - -// ResultChan implements the watch.Interface interface -func (e *etcdMasterElector) Stop() { - close(e.done) -} diff --git a/pkg/election/etcd_master_test.go b/pkg/election/etcd_master_test.go deleted file mode 100644 index a34b3eb3c8c..00000000000 --- a/pkg/election/etcd_master_test.go +++ /dev/null @@ -1,98 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package election - -import ( - "testing" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" - "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" - "github.com/coreos/go-etcd/etcd" -) - -func TestEtcdMasterOther(t *testing.T) { - path := "foo" - etcd := tools.NewFakeEtcdClient(t) - etcd.Set(path, "baz", 0) - master := NewEtcdMasterElector(etcd) - w := master.Elect(path, "bar") - result := <-w.ResultChan() - if result.Type != watch.Modified || result.Object.(Master) != "baz" { - t.Errorf("unexpected event: %#v", result) - } - w.Stop() -} - -func TestEtcdMasterNoOther(t *testing.T) { - path := "foo" - e := tools.NewFakeEtcdClient(t) - e.TestIndex = true - e.Data["foo"] = tools.EtcdResponseWithError{ - R: &etcd.Response{ - Node: nil, - }, - E: &etcd.EtcdError{ - ErrorCode: tools.EtcdErrorCodeNotFound, - }, - } - master := NewEtcdMasterElector(e) - w := master.Elect(path, "bar") - result := <-w.ResultChan() - if result.Type != watch.Modified || result.Object.(Master) != "bar" { - t.Errorf("unexpected event: %#v", result) - } - w.Stop() -} - -func TestEtcdMasterNoOtherThenConflict(t *testing.T) { - path := "foo" - e := tools.NewFakeEtcdClient(t) - e.TestIndex = true - // Ok, so we set up a chain of responses from etcd: - // 1) Nothing there - // 2) conflict (someone else wrote) - // 3) new value (the data they wrote) - empty := tools.EtcdResponseWithError{ - R: &etcd.Response{ - Node: nil, - }, - E: &etcd.EtcdError{ - ErrorCode: tools.EtcdErrorCodeNotFound, - }, - } - empty.N = &tools.EtcdResponseWithError{ - R: &etcd.Response{}, - E: &etcd.EtcdError{ - ErrorCode: tools.EtcdErrorCodeNodeExist, - }, - } - empty.N.N = &tools.EtcdResponseWithError{ - R: &etcd.Response{ - Node: &etcd.Node{ - Value: "baz", - }, - }, - } - e.Data["foo"] = empty - master := NewEtcdMasterElector(e) - w := master.Elect(path, "bar") - result := <-w.ResultChan() - if result.Type != watch.Modified || result.Object.(Master) != "bar" { - t.Errorf("unexpected event: %#v", result) - } - w.Stop() -} diff --git a/pkg/election/fake.go b/pkg/election/fake.go deleted file mode 100644 index 27852b62bbf..00000000000 --- a/pkg/election/fake.go +++ /dev/null @@ -1,53 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package election - -import ( - "sync" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" -) - -// Fake allows for testing of anything consuming a MasterElector. -type Fake struct { - mux *watch.Mux - currentMaster Master - lock sync.Mutex // Protect access of currentMaster -} - -// NewFake makes a new fake MasterElector. -func NewFake() *Fake { - // 0 means block for clients. - return &Fake{mux: watch.NewMux(0)} -} - -func (f *Fake) ChangeMaster(newMaster Master) { - f.lock.Lock() - defer f.lock.Unlock() - f.mux.Action(watch.Modified, newMaster) - f.currentMaster = newMaster -} - -func (f *Fake) Elect(path, id string) watch.Interface { - f.lock.Lock() - defer f.lock.Unlock() - w := f.mux.Watch() - if f.currentMaster != "" { - f.mux.Action(watch.Modified, f.currentMaster) - } - return w -} diff --git a/pkg/election/master.go b/pkg/election/master.go deleted file mode 100644 index 7cb59ffce2b..00000000000 --- a/pkg/election/master.go +++ /dev/null @@ -1,110 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package election - -import ( - "sync" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" - - "github.com/golang/glog" -) - -// MasterElector is an interface for services that can elect masters. -// Important Note: MasterElectors are not inter-operable, all participants in the election need to be -// using the same underlying implementation of this interface for correct behavior. -type MasterElector interface { - // RequestMaster makes the caller represented by 'id' enter into a master election for the - // distributed lock defined by 'path' - // The returned watch.Interface provides a stream of Master objects which - // contain the current master. - // Calling Stop on the returned interface relinquishes ownership (if currently possesed) - // and removes the caller from the election - Elect(path, id string) watch.Interface -} - -// Service represents anything that can start and stop on demand. -type Service interface { - Start() - Stop() -} - -type notifier struct { - lock sync.Mutex - cond *sync.Cond - - // desired is updated with every change, current is updated after - // Start()/Stop() finishes. 'cond' is used to signal that a change - // might be needed. This handles the case where mastership flops - // around without calling Start()/Stop() excessively. - desired, current Master - - // for comparison, to see if we are master. - id Master - - service Service -} - -// Notify runs Elect() on m, and calls Start()/Stop() on s when the -// elected master starts/stops matching 'id'. Never returns. -func Notify(m MasterElector, path, id string, s Service) { - n := ¬ifier{id: Master(id), service: s} - n.cond = sync.NewCond(&n.lock) - go n.serviceLoop() - for { - w := m.Elect(path, id) - for { - event, open := <-w.ResultChan() - if !open { - break - } - if event.Type != watch.Modified { - continue - } - electedMaster, ok := event.Object.(Master) - if !ok { - glog.Errorf("Unexpected object from election channel: %v", event.Object) - break - } - func() { - n.lock.Lock() - defer n.lock.Unlock() - n.desired = electedMaster - if n.desired != n.current { - n.cond.Signal() - } - }() - } - } -} - -// serviceLoop waits for changes, and calls Start()/Stop() as needed. -func (n *notifier) serviceLoop() { - n.lock.Lock() - defer n.lock.Unlock() - for { - for n.desired == n.current { - n.cond.Wait() - } - if n.current != n.id && n.desired == n.id { - n.service.Start() - } else if n.current == n.id && n.desired != n.id { - n.service.Stop() - } - n.current = n.desired - } -} diff --git a/pkg/election/master_test.go b/pkg/election/master_test.go deleted file mode 100644 index 478602b5cec..00000000000 --- a/pkg/election/master_test.go +++ /dev/null @@ -1,82 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package election - -import ( - "testing" - "time" -) - -type slowService struct { - t *testing.T - on bool - // We explicitly have no lock to prove that - // Start and Stop are not called concurrently. - changes chan<- bool -} - -func (s *slowService) Start() { - if s.on { - s.t.Errorf("started already on service") - } - time.Sleep(2 * time.Millisecond) - s.on = true - s.changes <- true -} - -func (s *slowService) Stop() { - if !s.on { - s.t.Errorf("stopped already off service") - } - time.Sleep(2 * time.Millisecond) - s.on = false - s.changes <- false -} - -func Test(t *testing.T) { - m := NewFake() - changes := make(chan bool, 1500) - s := &slowService{t: t, changes: changes} - go Notify(m, "", "me", s) - - done := make(chan struct{}) - go func() { - for i := 0; i < 500; i++ { - for _, key := range []string{"me", "notme", "alsonotme"} { - m.ChangeMaster(Master(key)) - } - } - close(done) - }() - - <-done - time.Sleep(8 * time.Millisecond) - close(changes) - - changeList := []bool{} - for { - change, ok := <-changes - if !ok { - break - } - changeList = append(changeList, change) - } - - if len(changeList) > 1000 { - t.Errorf("unexpected number of changes: %v", len(changeList)) - } -} From 070c6c044090229068db43b61257f7ada5d15043 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Wed, 29 Oct 2014 12:27:35 -0700 Subject: [PATCH 9/9] Fix missing portalNets in tests once and for all by adding a default. --- cmd/integration/integration.go | 7 ------- pkg/master/master.go | 9 +++++++++ test/integration/client_test.go | 8 -------- 3 files changed, 9 insertions(+), 15 deletions(-) diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index dcb787a7b08..6aec0b8e6de 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -129,12 +129,6 @@ func startComponents(manifestURL string) (apiServerURL string) { } // Master - _, portalNet, err := net.ParseCIDR("10.0.0.0/24") - if err != nil { - glog.Fatalf("Unable to parse CIDR: %v", err) - } - glog.Infof("Using portalNet '%v'", portalNet) - host, port, err := net.SplitHostPort(strings.TrimLeft(apiServer.URL, "http://")) if err != nil { glog.Fatalf("Unable to parse URL '%v': %v", apiServer.URL, err) @@ -150,7 +144,6 @@ func startComponents(manifestURL string) (apiServerURL string) { EtcdHelper: helper, Minions: machineList, KubeletClient: fakeKubeletClient{}, - PortalNet: portalNet, Mux: mux, EnableLogsSupport: false, APIPrefix: "/api", diff --git a/pkg/master/master.go b/pkg/master/master.go index a014d41eaf6..ea62613a87c 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -131,6 +131,15 @@ func NewEtcdHelper(client tools.EtcdGetSet, version string) (helper tools.EtcdHe // setDefaults fills in any fields not set that are required to have valid data. func setDefaults(c *Config) { + if c.PortalNet == nil { + defaultNet := "10.0.0.0/24" + glog.Warningf("Portal net unspecified. Defaulting to %v.", defaultNet) + _, portalNet, err := net.ParseCIDR(defaultNet) + if err != nil { + glog.Fatalf("Unable to parse CIDR: %v", err) + } + c.PortalNet = portalNet + } if c.MasterCount == 0 { // Clearly, there will be at least one master. c.MasterCount = 1 diff --git a/test/integration/client_test.go b/test/integration/client_test.go index 9cac0f3d5a6..821564f8504 100644 --- a/test/integration/client_test.go +++ b/test/integration/client_test.go @@ -19,7 +19,6 @@ limitations under the License. package integration import ( - "net" "net/http" "net/http/httptest" "reflect" @@ -30,8 +29,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/master" "github.com/GoogleCloudPlatform/kubernetes/pkg/version" - - "github.com/golang/glog" ) func init() { @@ -45,13 +42,8 @@ func TestClient(t *testing.T) { } mux := http.NewServeMux() - _, portalNet, err := net.ParseCIDR("10.0.0.0/24") - if err != nil { - glog.Fatalf("Unable to parse CIDR: %v", err) - } master.New(&master.Config{ EtcdHelper: helper, - PortalNet: portalNet, Mux: mux, EnableLogsSupport: false, EnableUISupport: false,