diff --git a/cmd/apiserver/apiserver.go b/cmd/apiserver/apiserver.go index 9017630dda6..ac06f668370 100644 --- a/cmd/apiserver/apiserver.go +++ b/cmd/apiserver/apiserver.go @@ -42,9 +42,23 @@ import ( ) var ( - port = flag.Uint("port", 8080, "The port to listen on. Default 8080") + // Note: the weird ""+ in below lines seems to be the only way to get gofmt to + // arrange these text blocks sensibly. Grrr. + port = flag.Int("port", 8080, ""+ + "The port to listen on. Default 8080. It is assumed that firewall rules are "+ + "set up such that this port is not reachable from outside of the cluster. It is "+ + "further assumed that port 443 on the cluster's public address is proxied to this "+ + "port. This is performed by nginx in the default setup.") address = util.IP(net.ParseIP("127.0.0.1")) - readOnlyPort = flag.Uint("read_only_port", 7080, "The port from which to serve read-only resources. If 0, don't serve on a read-only address.") + publicAddressOverride = flag.String("public_address_override", "", ""+ + "Public serving address. Read only port will be opened on this address, "+ + "and it is assumed that port 443 at this address will be proxied/redirected "+ + "to '-address':'-port'. If blank, the address in the first listed interface "+ + "will be used.") + readOnlyPort = flag.Int("read_only_port", 7080, ""+ + "The port from which to serve read-only resources. If 0, don't serve on a "+ + "read-only address. It is assumed that firewall rules are set up such that "+ + "this port is not reachable from outside of the cluster.") apiPrefix = flag.String("api_prefix", "/api", "The prefix for API requests on the server. Default '/api'.") storageVersion = flag.String("storage_version", "", "The version to store resources with. Defaults to server preferred") cloudProvider = flag.String("cloud_provider", "", "The provider for cloud services. Empty string for no provider.") @@ -184,7 +198,7 @@ func main() { n := net.IPNet(portalNet) mux := http.NewServeMux() - m := master.New(&master.Config{ + config := &master.Config{ Client: client, Cloud: cloud, EtcdHelper: helper, @@ -207,13 +221,26 @@ func main() { APIPrefix: *apiPrefix, CorsAllowedOriginList: corsAllowedOriginList, TokenAuthFile: *tokenAuthFile, - }) + ReadOnlyPort: *readOnlyPort, + ReadWritePort: *port, + PublicAddress: *publicAddressOverride, + } + m := master.New(config) + + roLocation := "" if *readOnlyPort != 0 { + roLocation = net.JoinHostPort(config.PublicAddress, strconv.Itoa(config.ReadOnlyPort)) + } + rwLocation := net.JoinHostPort(address.String(), strconv.Itoa(int(*port))) + + // See the flag commentary to understand our assumptions when opening the read-only and read-write ports. + + if roLocation != "" { // Allow 1 read-only request per second, allow up to 20 in a burst before enforcing. rl := util.NewTokenBucketRateLimiter(1.0, 20) readOnlyServer := &http.Server{ - Addr: net.JoinHostPort(address.String(), strconv.Itoa(int(*readOnlyPort))), + Addr: roLocation, Handler: apiserver.RecoverPanics(apiserver.ReadOnly(apiserver.RateLimit(rl, m.Handler))), ReadTimeout: 5 * time.Minute, WriteTimeout: 5 * time.Minute, @@ -226,7 +253,7 @@ func main() { } s := &http.Server{ - Addr: net.JoinHostPort(address.String(), strconv.Itoa(int(*port))), + Addr: rwLocation, Handler: apiserver.RecoverPanics(m.Handler), ReadTimeout: 5 * time.Minute, WriteTimeout: 5 * time.Minute, diff --git a/cmd/e2e/e2e.go b/cmd/e2e/e2e.go index d6dbae931fb..a7583ba8436 100644 --- a/cmd/e2e/e2e.go +++ b/cmd/e2e/e2e.go @@ -98,6 +98,38 @@ func loadClientOrDie() *client.Client { return c } +func TestKubernetesROService(c *client.Client) bool { + svc := api.ServiceList{} + err := c.Get(). + Namespace("default"). + AbsPath("/api/v1beta1/proxy/services/kubernetes-ro/api/v1beta1/services"). + Do(). + Into(&svc) + if err != nil { + glog.Errorf("unexpected error listing services using ro service: %v", err) + return false + } + var foundRW, foundRO bool + for i := range svc.Items { + if svc.Items[i].Name == "kubernetes" { + foundRW = true + } + if svc.Items[i].Name == "kubernetes-ro" { + foundRO = true + } + } + if !foundRW { + glog.Error("no RW service found") + } + if !foundRO { + glog.Error("no RO service found") + } + if !foundRW || !foundRO { + return false + } + return true +} + func TestPodUpdate(c *client.Client) bool { podClient := c.Pods(api.NamespaceDefault) @@ -158,7 +190,8 @@ func main() { c := loadClientOrDie() tests := []func(c *client.Client) bool{ - // TODO(brendandburns): fix this test and re-add it: TestPodUpdate, + TestKubernetesROService, + // TODO(brendandburns): fix this test and re-add it: TestPodUpdate, } passed := true diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index a29eb8c6a71..6aec0b8e6de 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -27,6 +27,8 @@ import ( "os" "reflect" "runtime" + "strconv" + "strings" "sync" "time" @@ -127,9 +129,13 @@ func startComponents(manifestURL string) (apiServerURL string) { } // Master - _, portalNet, err := net.ParseCIDR("10.0.0.0/24") + host, port, err := net.SplitHostPort(strings.TrimLeft(apiServer.URL, "http://")) if err != nil { - glog.Fatalf("Unable to parse CIDR: %v", err) + glog.Fatalf("Unable to parse URL '%v': %v", apiServer.URL, err) + } + portNumber, err := strconv.Atoi(port) + if err != nil { + glog.Fatalf("Nonnumeric port? %v", err) } mux := http.NewServeMux() // Create a master and install handlers into mux. @@ -138,10 +144,13 @@ func startComponents(manifestURL string) (apiServerURL string) { EtcdHelper: helper, Minions: machineList, KubeletClient: fakeKubeletClient{}, - PortalNet: portalNet, Mux: mux, EnableLogsSupport: false, APIPrefix: "/api", + + ReadWritePort: portNumber, + ReadOnlyPort: portNumber, + PublicAddress: host, }) handler.delegate = mux @@ -342,6 +351,68 @@ func runAtomicPutTest(c *client.Client) { glog.Info("Atomic PUTs work.") } +func runMasterServiceTest(client *client.Client) { + time.Sleep(12 * time.Second) + var svcList api.ServiceList + err := client.Get(). + Namespace("default"). + Path("services"). + Do(). + Into(&svcList) + if err != nil { + glog.Fatalf("unexpected error listing services: %v", err) + } + var foundRW, foundRO bool + found := util.StringSet{} + for i := range svcList.Items { + found.Insert(svcList.Items[i].Name) + if svcList.Items[i].Name == "kubernetes" { + foundRW = true + } + if svcList.Items[i].Name == "kubernetes-ro" { + foundRO = true + } + } + if foundRW { + var ep api.Endpoints + err := client.Get(). + Namespace("default"). + Path("endpoints"). + Path("kubernetes"). + Do(). + Into(&ep) + if err != nil { + glog.Fatalf("unexpected error listing endpoints for kubernetes service: %v", err) + } + if len(ep.Endpoints) == 0 { + glog.Fatalf("no endpoints for kubernetes service: %v", ep) + } + } else { + glog.Errorf("no RW service found: %v", found) + } + if foundRO { + var ep api.Endpoints + err := client.Get(). + Namespace("default"). + Path("endpoints"). + Path("kubernetes-ro"). + Do(). + Into(&ep) + if err != nil { + glog.Fatalf("unexpected error listing endpoints for kubernetes service: %v", err) + } + if len(ep.Endpoints) == 0 { + glog.Fatalf("no endpoints for kubernetes service: %v", ep) + } + } else { + glog.Errorf("no RO service found: %v", found) + } + if !foundRW || !foundRO { + glog.Fatalf("Kubernetes service test failed: %v", found) + } + glog.Infof("Master service test passed.") +} + func runServiceTest(client *client.Client) { pod := api.Pod{ ObjectMeta: api.ObjectMeta{ @@ -438,6 +509,7 @@ func main() { runAtomicPutTest, runServiceTest, runAPIVersionsTest, + runMasterServiceTest, } var wg sync.WaitGroup wg.Add(len(testFuncs)) diff --git a/pkg/apiserver/proxy.go b/pkg/apiserver/proxy.go index 879a88819fb..af734f7f7f7 100644 --- a/pkg/apiserver/proxy.go +++ b/pkg/apiserver/proxy.go @@ -113,10 +113,16 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { location, err := redirector.ResourceLocation(ctx, id) if err != nil { + httplog.LogOf(req, w).Addf("Error getting ResourceLocation: %v", err) status := errToAPIStatus(err) writeJSON(status.Code, r.codec, status, w) return } + if location == "" { + httplog.LogOf(req, w).Addf("ResourceLocation for %v returned ''", id) + notFound(w, req) + return + } destURL, err := url.Parse(location) if err != nil { @@ -124,11 +130,19 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { writeJSON(status.Code, r.codec, status, w) return } + if destURL.Scheme == "" { + // If no scheme was present in location, url.Parse sometimes mistakes + // hosts for paths. + destURL.Host = location + } destURL.Path = rest destURL.RawQuery = req.URL.RawQuery newReq, err := http.NewRequest(req.Method, destURL.String(), req.Body) if err != nil { - glog.Errorf("Failed to create request: %s", err) + status := errToAPIStatus(err) + writeJSON(status.Code, r.codec, status, w) + notFound(w, req) + return } newReq.Header = req.Header diff --git a/pkg/apiserver/redirect.go b/pkg/apiserver/redirect.go index f8b499d4e73..a9832291f24 100644 --- a/pkg/apiserver/redirect.go +++ b/pkg/apiserver/redirect.go @@ -30,7 +30,11 @@ type RedirectHandler struct { } func (r *RedirectHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { - ctx := api.NewContext() + ctx := api.NewDefaultContext() + namespace := req.URL.Query().Get("namespace") + if len(namespace) > 0 { + ctx = api.WithNamespace(ctx, namespace) + } parts := splitPath(req.URL.Path) if len(parts) != 2 || req.Method != "GET" { notFound(w, req) diff --git a/pkg/apiserver/redirect_test.go b/pkg/apiserver/redirect_test.go index 072c388858c..a17a286a16f 100644 --- a/pkg/apiserver/redirect_test.go +++ b/pkg/apiserver/redirect_test.go @@ -27,6 +27,7 @@ import ( func TestRedirect(t *testing.T) { simpleStorage := &SimpleRESTStorage{ errors: map[string]error{}, + expectedResourceNamespace: "default", } handler := Handle(map[string]RESTStorage{ "foo": simpleStorage, diff --git a/pkg/election/doc.go b/pkg/election/doc.go deleted file mode 100644 index 6982d3ec8aa..00000000000 --- a/pkg/election/doc.go +++ /dev/null @@ -1,18 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Package election provides interfaces used for master election. -package election diff --git a/pkg/election/etcd_master.go b/pkg/election/etcd_master.go deleted file mode 100644 index 35775f2a03f..00000000000 --- a/pkg/election/etcd_master.go +++ /dev/null @@ -1,185 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package election - -import ( - "fmt" - "time" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" - "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" - "github.com/coreos/go-etcd/etcd" - "github.com/golang/glog" -) - -// Master is used to announce the current elected master. -type Master string - -// IsAnAPIObject is used solely so we can work with the watch package. -// TODO: Either fix watch so this isn't necessary, or make this a real API Object. -// TODO: when it becomes clear how this package will be used, move these declarations to -// to the proper place. -func (Master) IsAnAPIObject() {} - -// NewEtcdMasterElector returns an implementation of election.MasterElector backed by etcd. -func NewEtcdMasterElector(h tools.EtcdGetSet) MasterElector { - return &etcdMasterElector{etcd: h} -} - -type empty struct{} - -// internal implementation struct -type etcdMasterElector struct { - etcd tools.EtcdGetSet - done chan empty - events chan watch.Event -} - -// Elect implements the election.MasterElector interface. -func (e *etcdMasterElector) Elect(path, id string) watch.Interface { - e.done = make(chan empty) - e.events = make(chan watch.Event) - go util.Forever(func() { e.run(path, id) }, time.Second*5) - return e -} - -func (e *etcdMasterElector) run(path, id string) { - masters := make(chan string) - errors := make(chan error) - go e.master(path, id, 30, masters, errors, e.done) - for { - select { - case m := <-masters: - e.events <- watch.Event{ - Type: watch.Modified, - Object: Master(m), - } - case e := <-errors: - glog.Errorf("error in election: %v", e) - } - } -} - -// ResultChan implements the watch.Interface interface. -func (e *etcdMasterElector) ResultChan() <-chan watch.Event { - return e.events -} - -// extendMaster attempts to extend ownership of a master lock for TTL seconds. -// returns "", nil if extension failed -// returns id, nil if extension succeeded -// returns "", err if an error occurred -func (e *etcdMasterElector) extendMaster(path, id string, ttl uint64, res *etcd.Response) (string, error) { - // If it matches the passed in id, extend the lease by writing a new entry. - // Uses compare and swap, so that if we TTL out in the meantime, the write will fail. - // We don't handle the TTL delete w/o a write case here, it's handled in the next loop - // iteration. - _, err := e.etcd.CompareAndSwap(path, id, ttl, "", res.Node.ModifiedIndex) - if err != nil && !tools.IsEtcdTestFailed(err) { - return "", err - } - if err != nil && tools.IsEtcdTestFailed(err) { - return "", nil - } - return id, nil -} - -// becomeMaster attempts to become the master for this lock. -// returns "", nil if the attempt failed -// returns id, nil if the attempt succeeded -// returns "", err if an error occurred -func (e *etcdMasterElector) becomeMaster(path, id string, ttl uint64) (string, error) { - _, err := e.etcd.Create(path, id, ttl) - if err != nil && !tools.IsEtcdNodeExist(err) { - // unexpected error - return "", err - } - if err != nil && tools.IsEtcdNodeExist(err) { - return "", nil - } - return id, nil -} - -// handleMaster performs one loop of master locking. -// on success it returns , nil -// on error it returns "", err -// in situations where you should try again due to concurrent state changes (e.g. another actor simultaneously acquiring the lock) -// it returns "", nil -func (e *etcdMasterElector) handleMaster(path, id string, ttl uint64) (string, error) { - res, err := e.etcd.Get(path, false, false) - - // Unexpected error, bail out - if err != nil && !tools.IsEtcdNotFound(err) { - return "", err - } - - // There is no master, try to become the master. - if err != nil && tools.IsEtcdNotFound(err) { - return e.becomeMaster(path, id, ttl) - } - - // This should never happen. - if res.Node == nil { - return "", fmt.Errorf("unexpected response: %#v", res) - } - - // We're not the master, just return the current value - if res.Node.Value != id { - return res.Node.Value, nil - } - - // We are the master, try to extend out lease - return e.extendMaster(path, id, ttl, res) -} - -// master provices a distributed master election lock, maintains lock until failure, or someone sends something in the done channel. -// The basic algorithm is: -// while !done -// Get the current master -// If there is no current master -// Try to become the master -// Otherwise -// If we are the master, extend the lease -// If the master is different than the last time through the loop, report the master -// Sleep 80% of TTL -func (e *etcdMasterElector) master(path, id string, ttl uint64, masters chan<- string, errors chan<- error, done <-chan empty) { - lastMaster := "" - for { - master, err := e.handleMaster(path, id, ttl) - if err != nil { - errors <- err - } else if len(master) == 0 { - continue - } else if master != lastMaster { - lastMaster = master - masters <- master - } - // TODO: Add Watch here, skip the polling for faster reactions - // If done is closed, break out. - select { - case <-done: - return - case <-time.After(time.Duration((ttl*8)/10) * time.Second): - } - } -} - -// ResultChan implements the watch.Interface interface -func (e *etcdMasterElector) Stop() { - close(e.done) -} diff --git a/pkg/election/etcd_master_test.go b/pkg/election/etcd_master_test.go deleted file mode 100644 index a34b3eb3c8c..00000000000 --- a/pkg/election/etcd_master_test.go +++ /dev/null @@ -1,98 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package election - -import ( - "testing" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" - "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" - "github.com/coreos/go-etcd/etcd" -) - -func TestEtcdMasterOther(t *testing.T) { - path := "foo" - etcd := tools.NewFakeEtcdClient(t) - etcd.Set(path, "baz", 0) - master := NewEtcdMasterElector(etcd) - w := master.Elect(path, "bar") - result := <-w.ResultChan() - if result.Type != watch.Modified || result.Object.(Master) != "baz" { - t.Errorf("unexpected event: %#v", result) - } - w.Stop() -} - -func TestEtcdMasterNoOther(t *testing.T) { - path := "foo" - e := tools.NewFakeEtcdClient(t) - e.TestIndex = true - e.Data["foo"] = tools.EtcdResponseWithError{ - R: &etcd.Response{ - Node: nil, - }, - E: &etcd.EtcdError{ - ErrorCode: tools.EtcdErrorCodeNotFound, - }, - } - master := NewEtcdMasterElector(e) - w := master.Elect(path, "bar") - result := <-w.ResultChan() - if result.Type != watch.Modified || result.Object.(Master) != "bar" { - t.Errorf("unexpected event: %#v", result) - } - w.Stop() -} - -func TestEtcdMasterNoOtherThenConflict(t *testing.T) { - path := "foo" - e := tools.NewFakeEtcdClient(t) - e.TestIndex = true - // Ok, so we set up a chain of responses from etcd: - // 1) Nothing there - // 2) conflict (someone else wrote) - // 3) new value (the data they wrote) - empty := tools.EtcdResponseWithError{ - R: &etcd.Response{ - Node: nil, - }, - E: &etcd.EtcdError{ - ErrorCode: tools.EtcdErrorCodeNotFound, - }, - } - empty.N = &tools.EtcdResponseWithError{ - R: &etcd.Response{}, - E: &etcd.EtcdError{ - ErrorCode: tools.EtcdErrorCodeNodeExist, - }, - } - empty.N.N = &tools.EtcdResponseWithError{ - R: &etcd.Response{ - Node: &etcd.Node{ - Value: "baz", - }, - }, - } - e.Data["foo"] = empty - master := NewEtcdMasterElector(e) - w := master.Elect(path, "bar") - result := <-w.ResultChan() - if result.Type != watch.Modified || result.Object.(Master) != "bar" { - t.Errorf("unexpected event: %#v", result) - } - w.Stop() -} diff --git a/pkg/election/master.go b/pkg/election/master.go deleted file mode 100644 index 14b16da5564..00000000000 --- a/pkg/election/master.go +++ /dev/null @@ -1,34 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package election - -import ( - "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" -) - -// MasterElector is an interface for services that can elect masters. -// Important Note: MasterElectors are not inter-operable, all participants in the election need to be -// using the same underlying implementation of this interface for correct behavior. -type MasterElector interface { - // RequestMaster makes the caller represented by 'id' enter into a master election for the - // distributed lock defined by 'path' - // The returned watch.Interface provides a stream of Master objects which - // contain the current master. - // Calling Stop on the returned interface relinquishes ownership (if currently possesed) - // and removes the caller from the election - Elect(path, id string) watch.Interface -} diff --git a/pkg/master/master.go b/pkg/master/master.go index b14b3e8851a..ea62613a87c 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -19,6 +19,7 @@ package master import ( "net" "net/http" + "strconv" "strings" "time" @@ -70,6 +71,20 @@ type Config struct { APIPrefix string CorsAllowedOriginList util.StringList TokenAuthFile string + + // Number of masters running; all masters must be started with the + // same value for this field. (Numbers > 1 currently untested.) + MasterCount int + + // The port on PublicAddress where a read-only server will be installed. + // Defaults to 7080 if not set. + ReadOnlyPort int + // The port on PublicAddress where a read-write server will be installed. + // Defaults to 443 if not set. + ReadWritePort int + + // If empty, the first result from net.InterfaceAddrs will be used. + PublicAddress string } // Master contains state for a Kubernetes cluster master/api server. @@ -91,8 +106,14 @@ type Master struct { apiPrefix string corsAllowedOriginList util.StringList tokenAuthFile string + masterCount int + // "Outputs" Handler http.Handler + + readOnlyServer string + readWriteServer string + masterServices *util.Runner } // NewEtcdHelper returns an EtcdHelper for the provided arguments or an error if the version @@ -108,8 +129,60 @@ func NewEtcdHelper(client tools.EtcdGetSet, version string) (helper tools.EtcdHe return tools.EtcdHelper{client, versionInterfaces.Codec, tools.RuntimeVersionAdapter{versionInterfaces.MetadataAccessor}}, nil } +// setDefaults fills in any fields not set that are required to have valid data. +func setDefaults(c *Config) { + if c.PortalNet == nil { + defaultNet := "10.0.0.0/24" + glog.Warningf("Portal net unspecified. Defaulting to %v.", defaultNet) + _, portalNet, err := net.ParseCIDR(defaultNet) + if err != nil { + glog.Fatalf("Unable to parse CIDR: %v", err) + } + c.PortalNet = portalNet + } + if c.MasterCount == 0 { + // Clearly, there will be at least one master. + c.MasterCount = 1 + } + if c.ReadOnlyPort == 0 { + c.ReadOnlyPort = 7080 + } + if c.ReadWritePort == 0 { + c.ReadWritePort = 443 + } + if c.PublicAddress == "" { + // Find and use the first non-loopback address. + // TODO: potentially it'd be useful to skip the docker interface if it + // somehow is first in the list. + addrs, err := net.InterfaceAddrs() + if err != nil { + glog.Fatalf("Unable to get network interfaces: error='%v'", err) + } + found := false + for i := range addrs { + ip, _, err := net.ParseCIDR(addrs[i].String()) + if err != nil { + glog.Errorf("Error parsing '%v': %v", addrs[i], err) + continue + } + if ip.IsLoopback() { + glog.Infof("'%v' (%v) is a loopback address, ignoring.", ip, addrs[i]) + continue + } + found = true + c.PublicAddress = ip.String() + glog.Infof("Will report %v as public IP address.", ip) + break + } + if !found { + glog.Fatalf("Unable to find suitible network address in list: %v", addrs) + } + } +} + // New returns a new instance of Master connected to the given etcd server. func New(c *Config) *Master { + setDefaults(c) minionRegistry := makeMinionRegistry(c) serviceRegistry := etcd.NewRegistry(c.EtcdHelper, nil) boundPodFactory := &pod.BasicBoundPodFactory{ @@ -131,7 +204,11 @@ func New(c *Config) *Master { apiPrefix: c.APIPrefix, corsAllowedOriginList: c.CorsAllowedOriginList, tokenAuthFile: c.TokenAuthFile, + masterCount: c.MasterCount, + readOnlyServer: net.JoinHostPort(c.PublicAddress, strconv.Itoa(int(c.ReadOnlyPort))), + readWriteServer: net.JoinHostPort(c.PublicAddress, strconv.Itoa(int(c.ReadWritePort))), } + m.masterServices = util.NewRunner(m.serviceWriterLoop, m.roServiceWriterLoop) m.init(c) return m } @@ -188,6 +265,7 @@ func (m *Master) init(c *Config) { // TODO: should appear only in scheduler API group. "bindings": binding.NewREST(m.bindingRegistry), } + apiserver.NewAPIGroup(m.API_v1beta1()).InstallREST(m.mux, c.APIPrefix+"/v1beta1") apiserver.NewAPIGroup(m.API_v1beta2()).InstallREST(m.mux, c.APIPrefix+"/v1beta2") versionHandler := apiserver.APIVersionHandler("v1beta1", "v1beta2") @@ -216,6 +294,9 @@ func (m *Master) init(c *Config) { m.mux.HandleFunc("/_whoami", handleWhoAmI(authenticator)) m.Handler = handler + + // TODO: Attempt clean shutdown? + m.masterServices.Start() } // API_v1beta1 returns the resources and codec for API version v1beta1. diff --git a/pkg/master/publish.go b/pkg/master/publish.go new file mode 100644 index 00000000000..83d3447ca06 --- /dev/null +++ b/pkg/master/publish.go @@ -0,0 +1,138 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package master + +import ( + "fmt" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + + "github.com/golang/glog" +) + +func (m *Master) serviceWriterLoop(stop chan struct{}) { + for { + // Update service & endpoint records. + // TODO: when it becomes possible to change this stuff, + // stop polling and start watching. + // TODO: add endpoints of all replicas, not just the elected master. + if m.readWriteServer != "" { + if err := m.createMasterServiceIfNeeded("kubernetes", 443); err != nil { + glog.Errorf("Can't create rw service: %v", err) + } + if err := m.ensureEndpointsContain("kubernetes", m.readWriteServer); err != nil { + glog.Errorf("Can't create rw endpoints: %v", err) + } + } + + select { + case <-stop: + return + case <-time.After(10 * time.Second): + } + } +} + +func (m *Master) roServiceWriterLoop(stop chan struct{}) { + for { + // Update service & endpoint records. + // TODO: when it becomes possible to change this stuff, + // stop polling and start watching. + if m.readOnlyServer != "" { + if err := m.createMasterServiceIfNeeded("kubernetes-ro", 80); err != nil { + glog.Errorf("Can't create ro service: %v", err) + } + if err := m.ensureEndpointsContain("kubernetes-ro", m.readOnlyServer); err != nil { + glog.Errorf("Can't create ro endpoints: %v", err) + } + } + + select { + case <-stop: + return + case <-time.After(10 * time.Second): + } + } +} + +// createMasterServiceIfNeeded will create the specified service if it +// doesn't already exist. +func (m *Master) createMasterServiceIfNeeded(serviceName string, port int) error { + ctx := api.NewDefaultContext() + if _, err := m.serviceRegistry.GetService(ctx, serviceName); err == nil { + // The service already exists. + return nil + } + svc := &api.Service{ + ObjectMeta: api.ObjectMeta{ + Name: serviceName, + Namespace: "default", + }, + Port: port, + // We're going to add the endpoints by hand, so this selector is mainly to + // prevent identification of other pods. This selector will be useful when + // we start hosting apiserver in a pod. + Selector: map[string]string{"provider": "kubernetes", "component": "apiserver"}, + } + // Kids, don't do this at home: this is a hack. There's no good way to call the business + // logic which lives in the REST object from here. + c, err := m.storage["services"].Create(ctx, svc) + if err != nil { + return err + } + resp := <-c + if _, ok := resp.Object.(*api.Service); ok { + // If all worked, we get back an *api.Service object. + return nil + } + return fmt.Errorf("Unexpected response: %#v", resp) +} + +// ensureEndpointsContain sets the endpoints for the given service. Also removes +// excess endpoints (as determined by m.masterCount). Extra endpoints could appear +// in the list if, for example, the master starts running on a different machine, +// changing IP addresses. +func (m *Master) ensureEndpointsContain(serviceName string, endpoint string) error { + ctx := api.NewDefaultContext() + e, err := m.endpointRegistry.GetEndpoints(ctx, serviceName) + if err != nil { + e = &api.Endpoints{} + // Fill in ID if it didn't exist already + e.ObjectMeta.Name = serviceName + e.ObjectMeta.Namespace = "default" + } + found := false + for i := range e.Endpoints { + if e.Endpoints[i] == endpoint { + found = true + break + } + } + if !found { + e.Endpoints = append(e.Endpoints, endpoint) + } + if len(e.Endpoints) > m.masterCount { + // We append to the end and remove from the beginning, so this should + // converge rapidly with all masters performing this operation. + e.Endpoints = e.Endpoints[len(e.Endpoints)-m.masterCount:] + } else if found { + // We didn't make any changes, no need to actually call update. + return nil + } + return m.endpointRegistry.UpdateEndpoints(ctx, e) +} diff --git a/pkg/registry/service/rest.go b/pkg/registry/service/rest.go index 6aefe50839d..8bd1652caf6 100644 --- a/pkg/registry/service/rest.go +++ b/pkg/registry/service/rest.go @@ -48,6 +48,9 @@ type REST struct { func NewREST(registry Registry, cloud cloudprovider.Interface, machines minion.Registry, portalNet *net.IPNet) *REST { // TODO: Before we can replicate masters, this has to be synced (e.g. lives in etcd) ipa := newIPAllocator(portalNet) + if ipa == nil { + glog.Fatalf("Failed to create an IP allocator. Is subnet '%v' valid?", portalNet) + } reloadIPsFromStorage(ipa, registry) return &REST{ diff --git a/pkg/service/endpoints_controller.go b/pkg/service/endpoints_controller.go index 75fb80dff8e..f76cec32aa3 100644 --- a/pkg/service/endpoints_controller.go +++ b/pkg/service/endpoints_controller.go @@ -51,6 +51,12 @@ func (e *EndpointController) SyncServiceEndpoints() error { } var resultErr error for _, service := range services.Items { + if service.Name == "kubernetes" || service.Name == "kubernetes-ro" { + // This is a temporary hack for supporting the master services + // until we actually start running apiserver in a pod. + continue + } + glog.Infof("About to update endpoints for service %v", service.Name) pods, err := e.client.Pods(service.Namespace).List(labels.Set(service.Selector).AsSelector()) if err != nil { glog.Errorf("Error syncing service: %#v, skipping.", service) diff --git a/pkg/util/runner.go b/pkg/util/runner.go new file mode 100644 index 00000000000..85cd7e21975 --- /dev/null +++ b/pkg/util/runner.go @@ -0,0 +1,58 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "sync" +) + +// Runner is an abstraction to make it easy to start and stop groups of things that can be +// described by a single function which waits on a channel close to exit. +type Runner struct { + lock sync.Mutex + loopFuncs []func(stop chan struct{}) + stop *chan struct{} +} + +// NewRunner makes a runner for the given function(s). The function(s) should loop until +// the channel is closed. +func NewRunner(f ...func(stop chan struct{})) *Runner { + return &Runner{loopFuncs: f} +} + +// Start begins running. +func (r *Runner) Start() { + r.lock.Lock() + defer r.lock.Unlock() + if r.stop == nil { + c := make(chan struct{}) + r.stop = &c + for i := range r.loopFuncs { + go r.loopFuncs[i](*r.stop) + } + } +} + +// Stop stops running. +func (r *Runner) Stop() { + r.lock.Lock() + defer r.lock.Unlock() + if r.stop != nil { + close(*r.stop) + r.stop = nil + } +} diff --git a/pkg/util/runner_test.go b/pkg/util/runner_test.go new file mode 100644 index 00000000000..0639edc52d2 --- /dev/null +++ b/pkg/util/runner_test.go @@ -0,0 +1,55 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "fmt" + "sync" + "testing" +) + +func TestRunner(t *testing.T) { + var ( + lock sync.Mutex + events []string + funcs []func(chan struct{}) + ) + done := make(chan struct{}, 20) + for i := 0; i < 10; i++ { + iCopy := i + funcs = append(funcs, func(c chan struct{}) { + lock.Lock() + events = append(events, fmt.Sprintf("%v starting\n", iCopy)) + lock.Unlock() + <-c + lock.Lock() + events = append(events, fmt.Sprintf("%v stopping\n", iCopy)) + lock.Unlock() + done <- struct{}{} + }) + } + + r := NewRunner(funcs...) + r.Start() + r.Stop() + for i := 0; i < 10; i++ { + <-done + } + if len(events) != 20 { + t.Errorf("expected 20 events, but got:\n%v\n", events) + } +} diff --git a/test/integration/client_test.go b/test/integration/client_test.go index a8de250deca..821564f8504 100644 --- a/test/integration/client_test.go +++ b/test/integration/client_test.go @@ -41,6 +41,7 @@ func TestClient(t *testing.T) { t.Fatalf("unexpected error: %v", err) } mux := http.NewServeMux() + master.New(&master.Config{ EtcdHelper: helper, Mux: mux,