plumb stopch to post start hook index since many of them are starting go funcs

This commit is contained in:
deads2k
2017-05-08 11:05:28 -04:00
parent 640373da10
commit be39283923
41 changed files with 254 additions and 224 deletions

View File

@@ -98,6 +98,8 @@ type MasterComponents struct {
ClientSet clientset.Interface
// Replication controller manager
ControllerManager *replicationcontroller.ReplicationManager
// CloseFn shuts down the server
CloseFn CloseFunc
// Channel for stop signals to rc manager
rcStopCh chan struct{}
// Used to stop master components individually, and via MasterComponents.Stop
@@ -118,7 +120,7 @@ type Config struct {
// NewMasterComponents creates, initializes and starts master components based on the given config.
func NewMasterComponents(c *Config) *MasterComponents {
m, s := startMasterOrDie(c.MasterConfig, nil, nil)
m, s, closeFn := startMasterOrDie(c.MasterConfig, nil, nil)
// TODO: Allow callers to pipe through a different master url and create a client/start components using it.
glog.Infof("Master %+v", s.URL)
// TODO: caesarxuchao: remove this client when the refactoring of client libraray is done.
@@ -138,6 +140,7 @@ func NewMasterComponents(c *Config) *MasterComponents {
KubeMaster: m,
ClientSet: clientset,
ControllerManager: controllerManager,
CloseFn: closeFn,
rcStopCh: rcStopCh,
}
}
@@ -173,7 +176,7 @@ func (h *MasterHolder) SetMaster(m *master.Master) {
}
// startMasterOrDie starts a kubernetes master and an httpserver to handle api requests
func startMasterOrDie(masterConfig *master.Config, incomingServer *httptest.Server, masterReceiver MasterReceiver) (*master.Master, *httptest.Server) {
func startMasterOrDie(masterConfig *master.Config, incomingServer *httptest.Server, masterReceiver MasterReceiver) (*master.Master, *httptest.Server, CloseFunc) {
var m *master.Master
var s *httptest.Server
@@ -185,6 +188,12 @@ func startMasterOrDie(masterConfig *master.Config, incomingServer *httptest.Serv
}))
}
stopCh := make(chan struct{})
closeFn := func() {
close(stopCh)
s.Close()
}
if masterConfig == nil {
masterConfig = NewMasterConfig()
masterConfig.GenericConfig.EnableProfiling = true
@@ -238,6 +247,7 @@ func startMasterOrDie(masterConfig *master.Config, incomingServer *httptest.Serv
m, err := masterConfig.Complete().New(genericapiserver.EmptyDelegate)
if err != nil {
closeFn()
glog.Fatalf("error in bringing up the master: %v", err)
}
if masterReceiver != nil {
@@ -248,12 +258,13 @@ func startMasterOrDie(masterConfig *master.Config, incomingServer *httptest.Serv
// this method never actually calls the `Run` method for the API server
// fire the post hooks ourselves
m.GenericAPIServer.PrepareRun()
m.GenericAPIServer.RunPostStartHooks()
m.GenericAPIServer.RunPostStartHooks(stopCh)
cfg := *masterConfig.GenericConfig.LoopbackClientConfig
cfg.ContentConfig.GroupVersion = &schema.GroupVersion{}
privilegedClient, err := restclient.RESTClientFor(&cfg)
if err != nil {
closeFn()
glog.Fatal(err)
}
err = wait.PollImmediate(100*time.Millisecond, 30*time.Second, func() (bool, error) {
@@ -266,6 +277,7 @@ func startMasterOrDie(masterConfig *master.Config, incomingServer *httptest.Serv
return false, nil
})
if err != nil {
closeFn()
glog.Fatal(err)
}
@@ -275,6 +287,7 @@ func startMasterOrDie(masterConfig *master.Config, incomingServer *httptest.Serv
coreClient := coreclient.NewForConfigOrDie(&cfg)
svcWatch, err := coreClient.Services(metav1.NamespaceDefault).Watch(metav1.ListOptions{})
if err != nil {
closeFn()
glog.Fatal(err)
}
_, err = watch.Until(30*time.Second, svcWatch, func(event watch.Event) (bool, error) {
@@ -287,11 +300,12 @@ func startMasterOrDie(masterConfig *master.Config, incomingServer *httptest.Serv
return false, nil
})
if err != nil {
closeFn()
glog.Fatal(err)
}
}
return m, s
return m, s, closeFn
}
func parseCIDROrDie(cidr string) *net.IPNet {
@@ -402,7 +416,7 @@ func (m *MasterComponents) Stop(apiServer, rcManager bool) {
m.once.Do(m.stopRCManager)
}
if apiServer {
m.ApiServer.Close()
m.CloseFn()
}
}
@@ -467,7 +481,10 @@ func ScaleRC(name, ns string, replicas int32, clientset internalclientset.Interf
return scaled, nil
}
func RunAMaster(masterConfig *master.Config) (*master.Master, *httptest.Server) {
// CloseFunc can be called to cleanup the master
type CloseFunc func()
func RunAMaster(masterConfig *master.Config) (*master.Master, *httptest.Server, CloseFunc) {
if masterConfig == nil {
masterConfig = NewMasterConfig()
masterConfig.GenericConfig.EnableProfiling = true
@@ -476,7 +493,7 @@ func RunAMaster(masterConfig *master.Config) (*master.Master, *httptest.Server)
return startMasterOrDie(masterConfig, nil, nil)
}
func RunAMasterUsingServer(masterConfig *master.Config, s *httptest.Server, masterReceiver MasterReceiver) (*master.Master, *httptest.Server) {
func RunAMasterUsingServer(masterConfig *master.Config, s *httptest.Server, masterReceiver MasterReceiver) (*master.Master, *httptest.Server, CloseFunc) {
return startMasterOrDie(masterConfig, s, masterReceiver)
}