Separate minion controller from master.

This commit is contained in:
Deyuan Deng
2014-10-21 21:21:44 -04:00
parent 41f0929384
commit 019b7fc74c
12 changed files with 267 additions and 291 deletions

View File

@@ -21,8 +21,8 @@ import (
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
)
@@ -31,31 +31,38 @@ type MinionController struct {
cloud cloudprovider.Interface
matchRE string
staticResources *api.NodeResources
registry minion.Registry
period time.Duration
minions []string
kubeClient client.Interface
}
// NewMinionController returns a new minion controller to sync instances from cloudprovider.
func NewMinionController(
cloud cloudprovider.Interface,
matchRE string,
minions []string,
staticResources *api.NodeResources,
registry minion.Registry,
period time.Duration) *MinionController {
kubeClient client.Interface) *MinionController {
return &MinionController{
cloud: cloud,
matchRE: matchRE,
minions: minions,
staticResources: staticResources,
registry: registry,
period: period,
kubeClient: kubeClient,
}
}
// Run starts syncing instances from cloudprovider periodically.
func (s *MinionController) Run() {
// Call Sync() first to warm up minion registry.
s.Sync()
go util.Forever(func() { s.Sync() }, s.period)
// Run starts syncing instances from cloudprovider periodically, or create initial minion list.
func (s *MinionController) Run(period time.Duration) {
if s.cloud != nil && len(s.matchRE) > 0 {
go util.Forever(func() { s.Sync() }, period)
} else {
for _, minionID := range s.minions {
s.kubeClient.Minions().Create(&api.Minion{
ObjectMeta: api.ObjectMeta{Name: minionID},
NodeResources: *s.staticResources,
})
}
}
}
// Sync syncs list of instances from cloudprovider to master etcd registry.
@@ -64,7 +71,7 @@ func (s *MinionController) Sync() error {
if err != nil {
return err
}
minions, err := s.registry.ListMinions(nil)
minions, err := s.kubeClient.Minions().List()
if err != nil {
return err
}
@@ -77,20 +84,14 @@ func (s *MinionController) Sync() error {
for _, minion := range matches.Items {
if _, ok := minionMap[minion.Name]; !ok {
glog.Infof("Create minion in registry: %s", minion.Name)
err = s.registry.CreateMinion(nil, &minion)
if err != nil {
return err
}
s.kubeClient.Minions().Create(&minion)
}
delete(minionMap, minion.Name)
}
for minionID := range minionMap {
glog.Infof("Delete minion from registry: %s", minionID)
err = s.registry.DeleteMinion(nil, minionID)
if err != nil {
return err
}
s.kubeClient.Minions().Delete(minionID)
}
return nil
}

View File

@@ -17,161 +17,103 @@ limitations under the License.
package controller
import (
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
fake_cloud "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake"
etcdregistry "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
etcd "github.com/coreos/go-etcd/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
func NewTestEtcdRegistry(client tools.EtcdClient) *etcdregistry.Registry {
registry := etcdregistry.NewRegistry(
tools.EtcdHelper{client, latest.Codec, tools.RuntimeVersionAdapter{latest.ResourceVersioner}},
&pod.BasicBoundPodFactory{
ServiceRegistry: &registrytest.ServiceRegistry{},
},
)
return registry
func newMinionList(count int) api.MinionList {
minions := []api.Minion{}
for i := 0; i < count; i++ {
minions = append(minions, api.Minion{
ObjectMeta: api.ObjectMeta{
Name: fmt.Sprintf("minion%d", i),
},
})
}
return api.MinionList{
Items: minions,
}
}
type serverResponse struct {
statusCode int
obj interface{}
}
func makeTestServer(t *testing.T, minionResponse serverResponse) (*httptest.Server, *util.FakeHandler) {
fakeMinionHandler := util.FakeHandler{
StatusCode: minionResponse.statusCode,
ResponseBody: util.EncodeJSON(minionResponse.obj),
}
mux := http.NewServeMux()
mux.Handle("/api/"+testapi.Version()+"/minions", &fakeMinionHandler)
mux.Handle("/api/"+testapi.Version()+"/minions/", &fakeMinionHandler)
mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) {
t.Errorf("unexpected request: %v", req.RequestURI)
res.WriteHeader(http.StatusNotFound)
})
return httptest.NewServer(mux), &fakeMinionHandler
}
func TestSyncCreateMinion(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t)
m1 := runtime.EncodeOrDie(latest.Codec, &api.Minion{ObjectMeta: api.ObjectMeta{Name: "m1"}})
m2 := runtime.EncodeOrDie(latest.Codec, &api.Minion{ObjectMeta: api.ObjectMeta{Name: "m2"}})
fakeClient.Set("/registry/minions/m1", m1, 0)
fakeClient.Set("/registry/minions/m2", m2, 0)
fakeClient.ExpectNotFoundGet("/registry/minions/m3")
fakeClient.Data["/registry/minions"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Nodes: []*etcd.Node{
{Value: m1},
{Value: m2},
},
},
},
E: nil,
}
registry := NewTestEtcdRegistry(fakeClient)
instances := []string{"m1", "m2", "m3"}
testServer, minionHandler := makeTestServer(t,
serverResponse{http.StatusOK, newMinionList(1)})
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
instances := []string{"minion0", "minion1"}
fakeCloud := fake_cloud.FakeCloud{
Machines: instances,
}
minionController := NewMinionController(&fakeCloud, ".*", nil, registry, time.Second)
minion, err := registry.GetMinion(ctx, "m3")
if minion != nil {
t.Errorf("Unexpected contains")
}
err = minionController.Sync()
if err != nil {
minionController := NewMinionController(&fakeCloud, ".*", nil, nil, client)
if err := minionController.Sync(); err != nil {
t.Errorf("unexpected error: %v", err)
}
minion, err = registry.GetMinion(ctx, "m3")
if minion == nil {
t.Errorf("Unexpected !contains")
}
data := runtime.EncodeOrDie(testapi.Codec(), &api.Minion{ObjectMeta: api.ObjectMeta{Name: "minion1"}})
minionHandler.ValidateRequest(t, "/api/"+testapi.Version()+"/minions", "POST", &data)
}
func TestSyncDeleteMinion(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t)
m1 := runtime.EncodeOrDie(latest.Codec, &api.Minion{ObjectMeta: api.ObjectMeta{Name: "m1"}})
m2 := runtime.EncodeOrDie(latest.Codec, &api.Minion{ObjectMeta: api.ObjectMeta{Name: "m2"}})
m3 := runtime.EncodeOrDie(latest.Codec, &api.Minion{ObjectMeta: api.ObjectMeta{Name: "m3"}})
fakeClient.Set("/registry/minions/m1", m1, 0)
fakeClient.Set("/registry/minions/m2", m2, 0)
fakeClient.Set("/registry/minions/m3", m3, 0)
fakeClient.Data["/registry/minions"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Nodes: []*etcd.Node{
{Value: m1},
{Value: m2},
{Value: m3},
},
},
},
E: nil,
}
registry := NewTestEtcdRegistry(fakeClient)
instances := []string{"m1", "m2"}
testServer, minionHandler := makeTestServer(t,
serverResponse{http.StatusOK, newMinionList(2)})
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
instances := []string{"minion0"}
fakeCloud := fake_cloud.FakeCloud{
Machines: instances,
}
minionController := NewMinionController(&fakeCloud, ".*", nil, registry, time.Second)
minion, err := registry.GetMinion(ctx, "m3")
if minion == nil {
t.Errorf("Unexpected !contains")
}
err = minionController.Sync()
if err != nil {
minionController := NewMinionController(&fakeCloud, ".*", nil, nil, client)
if err := minionController.Sync(); err != nil {
t.Errorf("unexpected error: %v", err)
}
minion, err = registry.GetMinion(ctx, "m3")
if minion != nil {
t.Errorf("Unexpected contains")
}
minionHandler.ValidateRequest(t, "/api/"+testapi.Version()+"/minions/minion1", "DELETE", nil)
}
func TestSyncMinionRegexp(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.Data["/registry/minions"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Nodes: []*etcd.Node{},
},
},
E: nil,
}
registry := NewTestEtcdRegistry(fakeClient)
instances := []string{"m1", "m2", "n1", "n2"}
testServer, minionHandler := makeTestServer(t,
serverResponse{http.StatusOK, newMinionList(1)})
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
instances := []string{"minion0", "minion1", "node0"}
fakeCloud := fake_cloud.FakeCloud{
Machines: instances,
}
minionController := NewMinionController(&fakeCloud, "m[0-9]+", nil, registry, time.Second)
err := minionController.Sync()
if err != nil {
minionController := NewMinionController(&fakeCloud, "minion[0-9]+", nil, nil, client)
if err := minionController.Sync(); err != nil {
t.Errorf("unexpected error: %v", err)
}
var minion *api.Minion
fakeClient.ExpectNotFoundGet("/registry/minions/n1")
fakeClient.ExpectNotFoundGet("/registry/minions/n2")
minion, err = registry.GetMinion(ctx, "m1")
if minion == nil {
t.Errorf("Unexpected !contains")
}
minion, err = registry.GetMinion(ctx, "m2")
if minion == nil {
t.Errorf("Unexpected !contains")
}
minion, err = registry.GetMinion(ctx, "n1")
if minion != nil {
t.Errorf("Unexpected !contains")
}
minion, err = registry.GetMinion(ctx, "n2")
if minion != nil {
t.Errorf("Unexpected !contains")
}
// Only minion1 is created.
data := runtime.EncodeOrDie(testapi.Codec(), &api.Minion{ObjectMeta: api.ObjectMeta{Name: "minion1"}})
minionHandler.ValidateRequest(t, "/api/"+testapi.Version()+"/minions", "POST", &data)
}