mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 15:25:57 +00:00
Merge pull request #1920 from lavalamp/fix
Publish apiserver services. Remove election framework.
This commit is contained in:
commit
ecdf65f4b1
@ -42,9 +42,23 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
port = flag.Uint("port", 8080, "The port to listen on. Default 8080")
|
||||
// Note: the weird ""+ in below lines seems to be the only way to get gofmt to
|
||||
// arrange these text blocks sensibly. Grrr.
|
||||
port = flag.Int("port", 8080, ""+
|
||||
"The port to listen on. Default 8080. It is assumed that firewall rules are "+
|
||||
"set up such that this port is not reachable from outside of the cluster. It is "+
|
||||
"further assumed that port 443 on the cluster's public address is proxied to this "+
|
||||
"port. This is performed by nginx in the default setup.")
|
||||
address = util.IP(net.ParseIP("127.0.0.1"))
|
||||
readOnlyPort = flag.Uint("read_only_port", 7080, "The port from which to serve read-only resources. If 0, don't serve on a read-only address.")
|
||||
publicAddressOverride = flag.String("public_address_override", "", ""+
|
||||
"Public serving address. Read only port will be opened on this address, "+
|
||||
"and it is assumed that port 443 at this address will be proxied/redirected "+
|
||||
"to '-address':'-port'. If blank, the address in the first listed interface "+
|
||||
"will be used.")
|
||||
readOnlyPort = flag.Int("read_only_port", 7080, ""+
|
||||
"The port from which to serve read-only resources. If 0, don't serve on a "+
|
||||
"read-only address. It is assumed that firewall rules are set up such that "+
|
||||
"this port is not reachable from outside of the cluster.")
|
||||
apiPrefix = flag.String("api_prefix", "/api", "The prefix for API requests on the server. Default '/api'.")
|
||||
storageVersion = flag.String("storage_version", "", "The version to store resources with. Defaults to server preferred")
|
||||
cloudProvider = flag.String("cloud_provider", "", "The provider for cloud services. Empty string for no provider.")
|
||||
@ -184,7 +198,7 @@ func main() {
|
||||
|
||||
n := net.IPNet(portalNet)
|
||||
mux := http.NewServeMux()
|
||||
m := master.New(&master.Config{
|
||||
config := &master.Config{
|
||||
Client: client,
|
||||
Cloud: cloud,
|
||||
EtcdHelper: helper,
|
||||
@ -207,13 +221,26 @@ func main() {
|
||||
APIPrefix: *apiPrefix,
|
||||
CorsAllowedOriginList: corsAllowedOriginList,
|
||||
TokenAuthFile: *tokenAuthFile,
|
||||
})
|
||||
|
||||
ReadOnlyPort: *readOnlyPort,
|
||||
ReadWritePort: *port,
|
||||
PublicAddress: *publicAddressOverride,
|
||||
}
|
||||
m := master.New(config)
|
||||
|
||||
roLocation := ""
|
||||
if *readOnlyPort != 0 {
|
||||
roLocation = net.JoinHostPort(config.PublicAddress, strconv.Itoa(config.ReadOnlyPort))
|
||||
}
|
||||
rwLocation := net.JoinHostPort(address.String(), strconv.Itoa(int(*port)))
|
||||
|
||||
// See the flag commentary to understand our assumptions when opening the read-only and read-write ports.
|
||||
|
||||
if roLocation != "" {
|
||||
// Allow 1 read-only request per second, allow up to 20 in a burst before enforcing.
|
||||
rl := util.NewTokenBucketRateLimiter(1.0, 20)
|
||||
readOnlyServer := &http.Server{
|
||||
Addr: net.JoinHostPort(address.String(), strconv.Itoa(int(*readOnlyPort))),
|
||||
Addr: roLocation,
|
||||
Handler: apiserver.RecoverPanics(apiserver.ReadOnly(apiserver.RateLimit(rl, m.Handler))),
|
||||
ReadTimeout: 5 * time.Minute,
|
||||
WriteTimeout: 5 * time.Minute,
|
||||
@ -226,7 +253,7 @@ func main() {
|
||||
}
|
||||
|
||||
s := &http.Server{
|
||||
Addr: net.JoinHostPort(address.String(), strconv.Itoa(int(*port))),
|
||||
Addr: rwLocation,
|
||||
Handler: apiserver.RecoverPanics(m.Handler),
|
||||
ReadTimeout: 5 * time.Minute,
|
||||
WriteTimeout: 5 * time.Minute,
|
||||
|
@ -98,6 +98,38 @@ func loadClientOrDie() *client.Client {
|
||||
return c
|
||||
}
|
||||
|
||||
func TestKubernetesROService(c *client.Client) bool {
|
||||
svc := api.ServiceList{}
|
||||
err := c.Get().
|
||||
Namespace("default").
|
||||
AbsPath("/api/v1beta1/proxy/services/kubernetes-ro/api/v1beta1/services").
|
||||
Do().
|
||||
Into(&svc)
|
||||
if err != nil {
|
||||
glog.Errorf("unexpected error listing services using ro service: %v", err)
|
||||
return false
|
||||
}
|
||||
var foundRW, foundRO bool
|
||||
for i := range svc.Items {
|
||||
if svc.Items[i].Name == "kubernetes" {
|
||||
foundRW = true
|
||||
}
|
||||
if svc.Items[i].Name == "kubernetes-ro" {
|
||||
foundRO = true
|
||||
}
|
||||
}
|
||||
if !foundRW {
|
||||
glog.Error("no RW service found")
|
||||
}
|
||||
if !foundRO {
|
||||
glog.Error("no RO service found")
|
||||
}
|
||||
if !foundRW || !foundRO {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func TestPodUpdate(c *client.Client) bool {
|
||||
podClient := c.Pods(api.NamespaceDefault)
|
||||
|
||||
@ -158,7 +190,8 @@ func main() {
|
||||
c := loadClientOrDie()
|
||||
|
||||
tests := []func(c *client.Client) bool{
|
||||
// TODO(brendandburns): fix this test and re-add it: TestPodUpdate,
|
||||
TestKubernetesROService,
|
||||
// TODO(brendandburns): fix this test and re-add it: TestPodUpdate,
|
||||
}
|
||||
|
||||
passed := true
|
||||
|
@ -27,6 +27,8 @@ import (
|
||||
"os"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -127,9 +129,13 @@ func startComponents(manifestURL string) (apiServerURL string) {
|
||||
}
|
||||
|
||||
// Master
|
||||
_, portalNet, err := net.ParseCIDR("10.0.0.0/24")
|
||||
host, port, err := net.SplitHostPort(strings.TrimLeft(apiServer.URL, "http://"))
|
||||
if err != nil {
|
||||
glog.Fatalf("Unable to parse CIDR: %v", err)
|
||||
glog.Fatalf("Unable to parse URL '%v': %v", apiServer.URL, err)
|
||||
}
|
||||
portNumber, err := strconv.Atoi(port)
|
||||
if err != nil {
|
||||
glog.Fatalf("Nonnumeric port? %v", err)
|
||||
}
|
||||
mux := http.NewServeMux()
|
||||
// Create a master and install handlers into mux.
|
||||
@ -138,10 +144,13 @@ func startComponents(manifestURL string) (apiServerURL string) {
|
||||
EtcdHelper: helper,
|
||||
Minions: machineList,
|
||||
KubeletClient: fakeKubeletClient{},
|
||||
PortalNet: portalNet,
|
||||
Mux: mux,
|
||||
EnableLogsSupport: false,
|
||||
APIPrefix: "/api",
|
||||
|
||||
ReadWritePort: portNumber,
|
||||
ReadOnlyPort: portNumber,
|
||||
PublicAddress: host,
|
||||
})
|
||||
handler.delegate = mux
|
||||
|
||||
@ -342,6 +351,68 @@ func runAtomicPutTest(c *client.Client) {
|
||||
glog.Info("Atomic PUTs work.")
|
||||
}
|
||||
|
||||
func runMasterServiceTest(client *client.Client) {
|
||||
time.Sleep(12 * time.Second)
|
||||
var svcList api.ServiceList
|
||||
err := client.Get().
|
||||
Namespace("default").
|
||||
Path("services").
|
||||
Do().
|
||||
Into(&svcList)
|
||||
if err != nil {
|
||||
glog.Fatalf("unexpected error listing services: %v", err)
|
||||
}
|
||||
var foundRW, foundRO bool
|
||||
found := util.StringSet{}
|
||||
for i := range svcList.Items {
|
||||
found.Insert(svcList.Items[i].Name)
|
||||
if svcList.Items[i].Name == "kubernetes" {
|
||||
foundRW = true
|
||||
}
|
||||
if svcList.Items[i].Name == "kubernetes-ro" {
|
||||
foundRO = true
|
||||
}
|
||||
}
|
||||
if foundRW {
|
||||
var ep api.Endpoints
|
||||
err := client.Get().
|
||||
Namespace("default").
|
||||
Path("endpoints").
|
||||
Path("kubernetes").
|
||||
Do().
|
||||
Into(&ep)
|
||||
if err != nil {
|
||||
glog.Fatalf("unexpected error listing endpoints for kubernetes service: %v", err)
|
||||
}
|
||||
if len(ep.Endpoints) == 0 {
|
||||
glog.Fatalf("no endpoints for kubernetes service: %v", ep)
|
||||
}
|
||||
} else {
|
||||
glog.Errorf("no RW service found: %v", found)
|
||||
}
|
||||
if foundRO {
|
||||
var ep api.Endpoints
|
||||
err := client.Get().
|
||||
Namespace("default").
|
||||
Path("endpoints").
|
||||
Path("kubernetes-ro").
|
||||
Do().
|
||||
Into(&ep)
|
||||
if err != nil {
|
||||
glog.Fatalf("unexpected error listing endpoints for kubernetes service: %v", err)
|
||||
}
|
||||
if len(ep.Endpoints) == 0 {
|
||||
glog.Fatalf("no endpoints for kubernetes service: %v", ep)
|
||||
}
|
||||
} else {
|
||||
glog.Errorf("no RO service found: %v", found)
|
||||
}
|
||||
if !foundRW || !foundRO {
|
||||
glog.Fatalf("Kubernetes service test failed: %v", found)
|
||||
}
|
||||
glog.Infof("Master service test passed.")
|
||||
}
|
||||
|
||||
func runServiceTest(client *client.Client) {
|
||||
pod := api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
@ -438,6 +509,7 @@ func main() {
|
||||
runAtomicPutTest,
|
||||
runServiceTest,
|
||||
runAPIVersionsTest,
|
||||
runMasterServiceTest,
|
||||
}
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(testFuncs))
|
||||
|
@ -113,10 +113,16 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
|
||||
location, err := redirector.ResourceLocation(ctx, id)
|
||||
if err != nil {
|
||||
httplog.LogOf(req, w).Addf("Error getting ResourceLocation: %v", err)
|
||||
status := errToAPIStatus(err)
|
||||
writeJSON(status.Code, r.codec, status, w)
|
||||
return
|
||||
}
|
||||
if location == "" {
|
||||
httplog.LogOf(req, w).Addf("ResourceLocation for %v returned ''", id)
|
||||
notFound(w, req)
|
||||
return
|
||||
}
|
||||
|
||||
destURL, err := url.Parse(location)
|
||||
if err != nil {
|
||||
@ -124,11 +130,19 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
writeJSON(status.Code, r.codec, status, w)
|
||||
return
|
||||
}
|
||||
if destURL.Scheme == "" {
|
||||
// If no scheme was present in location, url.Parse sometimes mistakes
|
||||
// hosts for paths.
|
||||
destURL.Host = location
|
||||
}
|
||||
destURL.Path = rest
|
||||
destURL.RawQuery = req.URL.RawQuery
|
||||
newReq, err := http.NewRequest(req.Method, destURL.String(), req.Body)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to create request: %s", err)
|
||||
status := errToAPIStatus(err)
|
||||
writeJSON(status.Code, r.codec, status, w)
|
||||
notFound(w, req)
|
||||
return
|
||||
}
|
||||
newReq.Header = req.Header
|
||||
|
||||
|
@ -30,7 +30,11 @@ type RedirectHandler struct {
|
||||
}
|
||||
|
||||
func (r *RedirectHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
ctx := api.NewContext()
|
||||
ctx := api.NewDefaultContext()
|
||||
namespace := req.URL.Query().Get("namespace")
|
||||
if len(namespace) > 0 {
|
||||
ctx = api.WithNamespace(ctx, namespace)
|
||||
}
|
||||
parts := splitPath(req.URL.Path)
|
||||
if len(parts) != 2 || req.Method != "GET" {
|
||||
notFound(w, req)
|
||||
|
@ -27,6 +27,7 @@ import (
|
||||
func TestRedirect(t *testing.T) {
|
||||
simpleStorage := &SimpleRESTStorage{
|
||||
errors: map[string]error{},
|
||||
expectedResourceNamespace: "default",
|
||||
}
|
||||
handler := Handle(map[string]RESTStorage{
|
||||
"foo": simpleStorage,
|
||||
|
@ -1,18 +0,0 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package election provides interfaces used for master election.
|
||||
package election
|
@ -1,185 +0,0 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package election
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
// Master is used to announce the current elected master.
|
||||
type Master string
|
||||
|
||||
// IsAnAPIObject is used solely so we can work with the watch package.
|
||||
// TODO: Either fix watch so this isn't necessary, or make this a real API Object.
|
||||
// TODO: when it becomes clear how this package will be used, move these declarations to
|
||||
// to the proper place.
|
||||
func (Master) IsAnAPIObject() {}
|
||||
|
||||
// NewEtcdMasterElector returns an implementation of election.MasterElector backed by etcd.
|
||||
func NewEtcdMasterElector(h tools.EtcdGetSet) MasterElector {
|
||||
return &etcdMasterElector{etcd: h}
|
||||
}
|
||||
|
||||
type empty struct{}
|
||||
|
||||
// internal implementation struct
|
||||
type etcdMasterElector struct {
|
||||
etcd tools.EtcdGetSet
|
||||
done chan empty
|
||||
events chan watch.Event
|
||||
}
|
||||
|
||||
// Elect implements the election.MasterElector interface.
|
||||
func (e *etcdMasterElector) Elect(path, id string) watch.Interface {
|
||||
e.done = make(chan empty)
|
||||
e.events = make(chan watch.Event)
|
||||
go util.Forever(func() { e.run(path, id) }, time.Second*5)
|
||||
return e
|
||||
}
|
||||
|
||||
func (e *etcdMasterElector) run(path, id string) {
|
||||
masters := make(chan string)
|
||||
errors := make(chan error)
|
||||
go e.master(path, id, 30, masters, errors, e.done)
|
||||
for {
|
||||
select {
|
||||
case m := <-masters:
|
||||
e.events <- watch.Event{
|
||||
Type: watch.Modified,
|
||||
Object: Master(m),
|
||||
}
|
||||
case e := <-errors:
|
||||
glog.Errorf("error in election: %v", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ResultChan implements the watch.Interface interface.
|
||||
func (e *etcdMasterElector) ResultChan() <-chan watch.Event {
|
||||
return e.events
|
||||
}
|
||||
|
||||
// extendMaster attempts to extend ownership of a master lock for TTL seconds.
|
||||
// returns "", nil if extension failed
|
||||
// returns id, nil if extension succeeded
|
||||
// returns "", err if an error occurred
|
||||
func (e *etcdMasterElector) extendMaster(path, id string, ttl uint64, res *etcd.Response) (string, error) {
|
||||
// If it matches the passed in id, extend the lease by writing a new entry.
|
||||
// Uses compare and swap, so that if we TTL out in the meantime, the write will fail.
|
||||
// We don't handle the TTL delete w/o a write case here, it's handled in the next loop
|
||||
// iteration.
|
||||
_, err := e.etcd.CompareAndSwap(path, id, ttl, "", res.Node.ModifiedIndex)
|
||||
if err != nil && !tools.IsEtcdTestFailed(err) {
|
||||
return "", err
|
||||
}
|
||||
if err != nil && tools.IsEtcdTestFailed(err) {
|
||||
return "", nil
|
||||
}
|
||||
return id, nil
|
||||
}
|
||||
|
||||
// becomeMaster attempts to become the master for this lock.
|
||||
// returns "", nil if the attempt failed
|
||||
// returns id, nil if the attempt succeeded
|
||||
// returns "", err if an error occurred
|
||||
func (e *etcdMasterElector) becomeMaster(path, id string, ttl uint64) (string, error) {
|
||||
_, err := e.etcd.Create(path, id, ttl)
|
||||
if err != nil && !tools.IsEtcdNodeExist(err) {
|
||||
// unexpected error
|
||||
return "", err
|
||||
}
|
||||
if err != nil && tools.IsEtcdNodeExist(err) {
|
||||
return "", nil
|
||||
}
|
||||
return id, nil
|
||||
}
|
||||
|
||||
// handleMaster performs one loop of master locking.
|
||||
// on success it returns <master>, nil
|
||||
// on error it returns "", err
|
||||
// in situations where you should try again due to concurrent state changes (e.g. another actor simultaneously acquiring the lock)
|
||||
// it returns "", nil
|
||||
func (e *etcdMasterElector) handleMaster(path, id string, ttl uint64) (string, error) {
|
||||
res, err := e.etcd.Get(path, false, false)
|
||||
|
||||
// Unexpected error, bail out
|
||||
if err != nil && !tools.IsEtcdNotFound(err) {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// There is no master, try to become the master.
|
||||
if err != nil && tools.IsEtcdNotFound(err) {
|
||||
return e.becomeMaster(path, id, ttl)
|
||||
}
|
||||
|
||||
// This should never happen.
|
||||
if res.Node == nil {
|
||||
return "", fmt.Errorf("unexpected response: %#v", res)
|
||||
}
|
||||
|
||||
// We're not the master, just return the current value
|
||||
if res.Node.Value != id {
|
||||
return res.Node.Value, nil
|
||||
}
|
||||
|
||||
// We are the master, try to extend out lease
|
||||
return e.extendMaster(path, id, ttl, res)
|
||||
}
|
||||
|
||||
// master provices a distributed master election lock, maintains lock until failure, or someone sends something in the done channel.
|
||||
// The basic algorithm is:
|
||||
// while !done
|
||||
// Get the current master
|
||||
// If there is no current master
|
||||
// Try to become the master
|
||||
// Otherwise
|
||||
// If we are the master, extend the lease
|
||||
// If the master is different than the last time through the loop, report the master
|
||||
// Sleep 80% of TTL
|
||||
func (e *etcdMasterElector) master(path, id string, ttl uint64, masters chan<- string, errors chan<- error, done <-chan empty) {
|
||||
lastMaster := ""
|
||||
for {
|
||||
master, err := e.handleMaster(path, id, ttl)
|
||||
if err != nil {
|
||||
errors <- err
|
||||
} else if len(master) == 0 {
|
||||
continue
|
||||
} else if master != lastMaster {
|
||||
lastMaster = master
|
||||
masters <- master
|
||||
}
|
||||
// TODO: Add Watch here, skip the polling for faster reactions
|
||||
// If done is closed, break out.
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
case <-time.After(time.Duration((ttl*8)/10) * time.Second):
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ResultChan implements the watch.Interface interface
|
||||
func (e *etcdMasterElector) Stop() {
|
||||
close(e.done)
|
||||
}
|
@ -1,98 +0,0 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package election
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
)
|
||||
|
||||
func TestEtcdMasterOther(t *testing.T) {
|
||||
path := "foo"
|
||||
etcd := tools.NewFakeEtcdClient(t)
|
||||
etcd.Set(path, "baz", 0)
|
||||
master := NewEtcdMasterElector(etcd)
|
||||
w := master.Elect(path, "bar")
|
||||
result := <-w.ResultChan()
|
||||
if result.Type != watch.Modified || result.Object.(Master) != "baz" {
|
||||
t.Errorf("unexpected event: %#v", result)
|
||||
}
|
||||
w.Stop()
|
||||
}
|
||||
|
||||
func TestEtcdMasterNoOther(t *testing.T) {
|
||||
path := "foo"
|
||||
e := tools.NewFakeEtcdClient(t)
|
||||
e.TestIndex = true
|
||||
e.Data["foo"] = tools.EtcdResponseWithError{
|
||||
R: &etcd.Response{
|
||||
Node: nil,
|
||||
},
|
||||
E: &etcd.EtcdError{
|
||||
ErrorCode: tools.EtcdErrorCodeNotFound,
|
||||
},
|
||||
}
|
||||
master := NewEtcdMasterElector(e)
|
||||
w := master.Elect(path, "bar")
|
||||
result := <-w.ResultChan()
|
||||
if result.Type != watch.Modified || result.Object.(Master) != "bar" {
|
||||
t.Errorf("unexpected event: %#v", result)
|
||||
}
|
||||
w.Stop()
|
||||
}
|
||||
|
||||
func TestEtcdMasterNoOtherThenConflict(t *testing.T) {
|
||||
path := "foo"
|
||||
e := tools.NewFakeEtcdClient(t)
|
||||
e.TestIndex = true
|
||||
// Ok, so we set up a chain of responses from etcd:
|
||||
// 1) Nothing there
|
||||
// 2) conflict (someone else wrote)
|
||||
// 3) new value (the data they wrote)
|
||||
empty := tools.EtcdResponseWithError{
|
||||
R: &etcd.Response{
|
||||
Node: nil,
|
||||
},
|
||||
E: &etcd.EtcdError{
|
||||
ErrorCode: tools.EtcdErrorCodeNotFound,
|
||||
},
|
||||
}
|
||||
empty.N = &tools.EtcdResponseWithError{
|
||||
R: &etcd.Response{},
|
||||
E: &etcd.EtcdError{
|
||||
ErrorCode: tools.EtcdErrorCodeNodeExist,
|
||||
},
|
||||
}
|
||||
empty.N.N = &tools.EtcdResponseWithError{
|
||||
R: &etcd.Response{
|
||||
Node: &etcd.Node{
|
||||
Value: "baz",
|
||||
},
|
||||
},
|
||||
}
|
||||
e.Data["foo"] = empty
|
||||
master := NewEtcdMasterElector(e)
|
||||
w := master.Elect(path, "bar")
|
||||
result := <-w.ResultChan()
|
||||
if result.Type != watch.Modified || result.Object.(Master) != "bar" {
|
||||
t.Errorf("unexpected event: %#v", result)
|
||||
}
|
||||
w.Stop()
|
||||
}
|
@ -1,34 +0,0 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package election
|
||||
|
||||
import (
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
// MasterElector is an interface for services that can elect masters.
|
||||
// Important Note: MasterElectors are not inter-operable, all participants in the election need to be
|
||||
// using the same underlying implementation of this interface for correct behavior.
|
||||
type MasterElector interface {
|
||||
// RequestMaster makes the caller represented by 'id' enter into a master election for the
|
||||
// distributed lock defined by 'path'
|
||||
// The returned watch.Interface provides a stream of Master objects which
|
||||
// contain the current master.
|
||||
// Calling Stop on the returned interface relinquishes ownership (if currently possesed)
|
||||
// and removes the caller from the election
|
||||
Elect(path, id string) watch.Interface
|
||||
}
|
@ -19,6 +19,7 @@ package master
|
||||
import (
|
||||
"net"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@ -70,6 +71,20 @@ type Config struct {
|
||||
APIPrefix string
|
||||
CorsAllowedOriginList util.StringList
|
||||
TokenAuthFile string
|
||||
|
||||
// Number of masters running; all masters must be started with the
|
||||
// same value for this field. (Numbers > 1 currently untested.)
|
||||
MasterCount int
|
||||
|
||||
// The port on PublicAddress where a read-only server will be installed.
|
||||
// Defaults to 7080 if not set.
|
||||
ReadOnlyPort int
|
||||
// The port on PublicAddress where a read-write server will be installed.
|
||||
// Defaults to 443 if not set.
|
||||
ReadWritePort int
|
||||
|
||||
// If empty, the first result from net.InterfaceAddrs will be used.
|
||||
PublicAddress string
|
||||
}
|
||||
|
||||
// Master contains state for a Kubernetes cluster master/api server.
|
||||
@ -91,8 +106,14 @@ type Master struct {
|
||||
apiPrefix string
|
||||
corsAllowedOriginList util.StringList
|
||||
tokenAuthFile string
|
||||
masterCount int
|
||||
|
||||
// "Outputs"
|
||||
Handler http.Handler
|
||||
|
||||
readOnlyServer string
|
||||
readWriteServer string
|
||||
masterServices *util.Runner
|
||||
}
|
||||
|
||||
// NewEtcdHelper returns an EtcdHelper for the provided arguments or an error if the version
|
||||
@ -108,8 +129,60 @@ func NewEtcdHelper(client tools.EtcdGetSet, version string) (helper tools.EtcdHe
|
||||
return tools.EtcdHelper{client, versionInterfaces.Codec, tools.RuntimeVersionAdapter{versionInterfaces.MetadataAccessor}}, nil
|
||||
}
|
||||
|
||||
// setDefaults fills in any fields not set that are required to have valid data.
|
||||
func setDefaults(c *Config) {
|
||||
if c.PortalNet == nil {
|
||||
defaultNet := "10.0.0.0/24"
|
||||
glog.Warningf("Portal net unspecified. Defaulting to %v.", defaultNet)
|
||||
_, portalNet, err := net.ParseCIDR(defaultNet)
|
||||
if err != nil {
|
||||
glog.Fatalf("Unable to parse CIDR: %v", err)
|
||||
}
|
||||
c.PortalNet = portalNet
|
||||
}
|
||||
if c.MasterCount == 0 {
|
||||
// Clearly, there will be at least one master.
|
||||
c.MasterCount = 1
|
||||
}
|
||||
if c.ReadOnlyPort == 0 {
|
||||
c.ReadOnlyPort = 7080
|
||||
}
|
||||
if c.ReadWritePort == 0 {
|
||||
c.ReadWritePort = 443
|
||||
}
|
||||
if c.PublicAddress == "" {
|
||||
// Find and use the first non-loopback address.
|
||||
// TODO: potentially it'd be useful to skip the docker interface if it
|
||||
// somehow is first in the list.
|
||||
addrs, err := net.InterfaceAddrs()
|
||||
if err != nil {
|
||||
glog.Fatalf("Unable to get network interfaces: error='%v'", err)
|
||||
}
|
||||
found := false
|
||||
for i := range addrs {
|
||||
ip, _, err := net.ParseCIDR(addrs[i].String())
|
||||
if err != nil {
|
||||
glog.Errorf("Error parsing '%v': %v", addrs[i], err)
|
||||
continue
|
||||
}
|
||||
if ip.IsLoopback() {
|
||||
glog.Infof("'%v' (%v) is a loopback address, ignoring.", ip, addrs[i])
|
||||
continue
|
||||
}
|
||||
found = true
|
||||
c.PublicAddress = ip.String()
|
||||
glog.Infof("Will report %v as public IP address.", ip)
|
||||
break
|
||||
}
|
||||
if !found {
|
||||
glog.Fatalf("Unable to find suitible network address in list: %v", addrs)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// New returns a new instance of Master connected to the given etcd server.
|
||||
func New(c *Config) *Master {
|
||||
setDefaults(c)
|
||||
minionRegistry := makeMinionRegistry(c)
|
||||
serviceRegistry := etcd.NewRegistry(c.EtcdHelper, nil)
|
||||
boundPodFactory := &pod.BasicBoundPodFactory{
|
||||
@ -131,7 +204,11 @@ func New(c *Config) *Master {
|
||||
apiPrefix: c.APIPrefix,
|
||||
corsAllowedOriginList: c.CorsAllowedOriginList,
|
||||
tokenAuthFile: c.TokenAuthFile,
|
||||
masterCount: c.MasterCount,
|
||||
readOnlyServer: net.JoinHostPort(c.PublicAddress, strconv.Itoa(int(c.ReadOnlyPort))),
|
||||
readWriteServer: net.JoinHostPort(c.PublicAddress, strconv.Itoa(int(c.ReadWritePort))),
|
||||
}
|
||||
m.masterServices = util.NewRunner(m.serviceWriterLoop, m.roServiceWriterLoop)
|
||||
m.init(c)
|
||||
return m
|
||||
}
|
||||
@ -188,6 +265,7 @@ func (m *Master) init(c *Config) {
|
||||
// TODO: should appear only in scheduler API group.
|
||||
"bindings": binding.NewREST(m.bindingRegistry),
|
||||
}
|
||||
|
||||
apiserver.NewAPIGroup(m.API_v1beta1()).InstallREST(m.mux, c.APIPrefix+"/v1beta1")
|
||||
apiserver.NewAPIGroup(m.API_v1beta2()).InstallREST(m.mux, c.APIPrefix+"/v1beta2")
|
||||
versionHandler := apiserver.APIVersionHandler("v1beta1", "v1beta2")
|
||||
@ -216,6 +294,9 @@ func (m *Master) init(c *Config) {
|
||||
m.mux.HandleFunc("/_whoami", handleWhoAmI(authenticator))
|
||||
|
||||
m.Handler = handler
|
||||
|
||||
// TODO: Attempt clean shutdown?
|
||||
m.masterServices.Start()
|
||||
}
|
||||
|
||||
// API_v1beta1 returns the resources and codec for API version v1beta1.
|
||||
|
138
pkg/master/publish.go
Normal file
138
pkg/master/publish.go
Normal file
@ -0,0 +1,138 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package master
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
func (m *Master) serviceWriterLoop(stop chan struct{}) {
|
||||
for {
|
||||
// Update service & endpoint records.
|
||||
// TODO: when it becomes possible to change this stuff,
|
||||
// stop polling and start watching.
|
||||
// TODO: add endpoints of all replicas, not just the elected master.
|
||||
if m.readWriteServer != "" {
|
||||
if err := m.createMasterServiceIfNeeded("kubernetes", 443); err != nil {
|
||||
glog.Errorf("Can't create rw service: %v", err)
|
||||
}
|
||||
if err := m.ensureEndpointsContain("kubernetes", m.readWriteServer); err != nil {
|
||||
glog.Errorf("Can't create rw endpoints: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-stop:
|
||||
return
|
||||
case <-time.After(10 * time.Second):
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Master) roServiceWriterLoop(stop chan struct{}) {
|
||||
for {
|
||||
// Update service & endpoint records.
|
||||
// TODO: when it becomes possible to change this stuff,
|
||||
// stop polling and start watching.
|
||||
if m.readOnlyServer != "" {
|
||||
if err := m.createMasterServiceIfNeeded("kubernetes-ro", 80); err != nil {
|
||||
glog.Errorf("Can't create ro service: %v", err)
|
||||
}
|
||||
if err := m.ensureEndpointsContain("kubernetes-ro", m.readOnlyServer); err != nil {
|
||||
glog.Errorf("Can't create ro endpoints: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-stop:
|
||||
return
|
||||
case <-time.After(10 * time.Second):
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// createMasterServiceIfNeeded will create the specified service if it
|
||||
// doesn't already exist.
|
||||
func (m *Master) createMasterServiceIfNeeded(serviceName string, port int) error {
|
||||
ctx := api.NewDefaultContext()
|
||||
if _, err := m.serviceRegistry.GetService(ctx, serviceName); err == nil {
|
||||
// The service already exists.
|
||||
return nil
|
||||
}
|
||||
svc := &api.Service{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: serviceName,
|
||||
Namespace: "default",
|
||||
},
|
||||
Port: port,
|
||||
// We're going to add the endpoints by hand, so this selector is mainly to
|
||||
// prevent identification of other pods. This selector will be useful when
|
||||
// we start hosting apiserver in a pod.
|
||||
Selector: map[string]string{"provider": "kubernetes", "component": "apiserver"},
|
||||
}
|
||||
// Kids, don't do this at home: this is a hack. There's no good way to call the business
|
||||
// logic which lives in the REST object from here.
|
||||
c, err := m.storage["services"].Create(ctx, svc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp := <-c
|
||||
if _, ok := resp.Object.(*api.Service); ok {
|
||||
// If all worked, we get back an *api.Service object.
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("Unexpected response: %#v", resp)
|
||||
}
|
||||
|
||||
// ensureEndpointsContain sets the endpoints for the given service. Also removes
|
||||
// excess endpoints (as determined by m.masterCount). Extra endpoints could appear
|
||||
// in the list if, for example, the master starts running on a different machine,
|
||||
// changing IP addresses.
|
||||
func (m *Master) ensureEndpointsContain(serviceName string, endpoint string) error {
|
||||
ctx := api.NewDefaultContext()
|
||||
e, err := m.endpointRegistry.GetEndpoints(ctx, serviceName)
|
||||
if err != nil {
|
||||
e = &api.Endpoints{}
|
||||
// Fill in ID if it didn't exist already
|
||||
e.ObjectMeta.Name = serviceName
|
||||
e.ObjectMeta.Namespace = "default"
|
||||
}
|
||||
found := false
|
||||
for i := range e.Endpoints {
|
||||
if e.Endpoints[i] == endpoint {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
e.Endpoints = append(e.Endpoints, endpoint)
|
||||
}
|
||||
if len(e.Endpoints) > m.masterCount {
|
||||
// We append to the end and remove from the beginning, so this should
|
||||
// converge rapidly with all masters performing this operation.
|
||||
e.Endpoints = e.Endpoints[len(e.Endpoints)-m.masterCount:]
|
||||
} else if found {
|
||||
// We didn't make any changes, no need to actually call update.
|
||||
return nil
|
||||
}
|
||||
return m.endpointRegistry.UpdateEndpoints(ctx, e)
|
||||
}
|
@ -48,6 +48,9 @@ type REST struct {
|
||||
func NewREST(registry Registry, cloud cloudprovider.Interface, machines minion.Registry, portalNet *net.IPNet) *REST {
|
||||
// TODO: Before we can replicate masters, this has to be synced (e.g. lives in etcd)
|
||||
ipa := newIPAllocator(portalNet)
|
||||
if ipa == nil {
|
||||
glog.Fatalf("Failed to create an IP allocator. Is subnet '%v' valid?", portalNet)
|
||||
}
|
||||
reloadIPsFromStorage(ipa, registry)
|
||||
|
||||
return &REST{
|
||||
|
@ -51,6 +51,12 @@ func (e *EndpointController) SyncServiceEndpoints() error {
|
||||
}
|
||||
var resultErr error
|
||||
for _, service := range services.Items {
|
||||
if service.Name == "kubernetes" || service.Name == "kubernetes-ro" {
|
||||
// This is a temporary hack for supporting the master services
|
||||
// until we actually start running apiserver in a pod.
|
||||
continue
|
||||
}
|
||||
glog.Infof("About to update endpoints for service %v", service.Name)
|
||||
pods, err := e.client.Pods(service.Namespace).List(labels.Set(service.Selector).AsSelector())
|
||||
if err != nil {
|
||||
glog.Errorf("Error syncing service: %#v, skipping.", service)
|
||||
|
58
pkg/util/runner.go
Normal file
58
pkg/util/runner.go
Normal file
@ -0,0 +1,58 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package util
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Runner is an abstraction to make it easy to start and stop groups of things that can be
|
||||
// described by a single function which waits on a channel close to exit.
|
||||
type Runner struct {
|
||||
lock sync.Mutex
|
||||
loopFuncs []func(stop chan struct{})
|
||||
stop *chan struct{}
|
||||
}
|
||||
|
||||
// NewRunner makes a runner for the given function(s). The function(s) should loop until
|
||||
// the channel is closed.
|
||||
func NewRunner(f ...func(stop chan struct{})) *Runner {
|
||||
return &Runner{loopFuncs: f}
|
||||
}
|
||||
|
||||
// Start begins running.
|
||||
func (r *Runner) Start() {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
if r.stop == nil {
|
||||
c := make(chan struct{})
|
||||
r.stop = &c
|
||||
for i := range r.loopFuncs {
|
||||
go r.loopFuncs[i](*r.stop)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Stop stops running.
|
||||
func (r *Runner) Stop() {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
if r.stop != nil {
|
||||
close(*r.stop)
|
||||
r.stop = nil
|
||||
}
|
||||
}
|
55
pkg/util/runner_test.go
Normal file
55
pkg/util/runner_test.go
Normal file
@ -0,0 +1,55 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package util
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestRunner(t *testing.T) {
|
||||
var (
|
||||
lock sync.Mutex
|
||||
events []string
|
||||
funcs []func(chan struct{})
|
||||
)
|
||||
done := make(chan struct{}, 20)
|
||||
for i := 0; i < 10; i++ {
|
||||
iCopy := i
|
||||
funcs = append(funcs, func(c chan struct{}) {
|
||||
lock.Lock()
|
||||
events = append(events, fmt.Sprintf("%v starting\n", iCopy))
|
||||
lock.Unlock()
|
||||
<-c
|
||||
lock.Lock()
|
||||
events = append(events, fmt.Sprintf("%v stopping\n", iCopy))
|
||||
lock.Unlock()
|
||||
done <- struct{}{}
|
||||
})
|
||||
}
|
||||
|
||||
r := NewRunner(funcs...)
|
||||
r.Start()
|
||||
r.Stop()
|
||||
for i := 0; i < 10; i++ {
|
||||
<-done
|
||||
}
|
||||
if len(events) != 20 {
|
||||
t.Errorf("expected 20 events, but got:\n%v\n", events)
|
||||
}
|
||||
}
|
@ -41,6 +41,7 @@ func TestClient(t *testing.T) {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
mux := http.NewServeMux()
|
||||
|
||||
master.New(&master.Config{
|
||||
EtcdHelper: helper,
|
||||
Mux: mux,
|
||||
|
Loading…
Reference in New Issue
Block a user