Merge pull request #8386 from smarterclayton/make_kubelet_consumable

Make it easier to reuse kubelet server code
This commit is contained in:
Victor Marmol 2015-05-18 11:26:55 -07:00
commit a7341cfb77
3 changed files with 29 additions and 18 deletions

View File

@ -242,7 +242,7 @@ func (s *KubeletServer) Run(_ []string) error {
// TODO(vmarmol): Do this through container config. // TODO(vmarmol): Do this through container config.
if err := util.ApplyOomScoreAdj(0, s.OOMScoreAdj); err != nil { if err := util.ApplyOomScoreAdj(0, s.OOMScoreAdj); err != nil {
glog.Info(err) glog.Warning(err)
} }
client, err := s.createAPIServerClient() client, err := s.createAPIServerClient()
@ -250,7 +250,7 @@ func (s *KubeletServer) Run(_ []string) error {
glog.Warningf("No API client: %v", err) glog.Warningf("No API client: %v", err)
} }
glog.Infof("Using root directory: %v", s.RootDirectory) glog.V(2).Infof("Using root directory: %v", s.RootDirectory)
credentialprovider.SetPreferredDockercfgPath(s.RootDirectory) credentialprovider.SetPreferredDockercfgPath(s.RootDirectory)
@ -269,7 +269,7 @@ func (s *KubeletServer) Run(_ []string) error {
RootFreeDiskMB: s.LowDiskSpaceThresholdMB, RootFreeDiskMB: s.LowDiskSpaceThresholdMB,
} }
cloud := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile) cloud := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
glog.Infof("Successfully initialized cloud provider: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile) glog.V(2).Infof("Successfully initialized cloud provider: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
hostNetworkSources, err := kubelet.GetValidatedSources(strings.Split(s.HostNetworkSources, ",")) hostNetworkSources, err := kubelet.GetValidatedSources(strings.Split(s.HostNetworkSources, ","))
if err != nil { if err != nil {
@ -280,9 +280,9 @@ func (s *KubeletServer) Run(_ []string) error {
s.TLSCertFile = path.Join(s.CertDirectory, "kubelet.crt") s.TLSCertFile = path.Join(s.CertDirectory, "kubelet.crt")
s.TLSPrivateKeyFile = path.Join(s.CertDirectory, "kubelet.key") s.TLSPrivateKeyFile = path.Join(s.CertDirectory, "kubelet.key")
if err := util.GenerateSelfSignedCert(util.GetHostname(s.HostnameOverride), s.TLSCertFile, s.TLSPrivateKeyFile); err != nil { if err := util.GenerateSelfSignedCert(util.GetHostname(s.HostnameOverride), s.TLSCertFile, s.TLSPrivateKeyFile); err != nil {
glog.Fatalf("Unable to generate self signed cert: %v", err) return fmt.Errorf("unable to generate self signed cert: %v", err)
} }
glog.Infof("Using self-signed cert (%s, %s)", s.TLSCertFile, s.TLSPrivateKeyFile) glog.V(4).Infof("Using self-signed cert (%s, %s)", s.TLSCertFile, s.TLSPrivateKeyFile)
} }
tlsOptions := &kubelet.TLSOptions{ tlsOptions := &kubelet.TLSOptions{
Config: &tls.Config{ Config: &tls.Config{
@ -297,7 +297,7 @@ func (s *KubeletServer) Run(_ []string) error {
mounter := mount.New() mounter := mount.New()
if s.Containerized { if s.Containerized {
glog.Info("Running kubelet in containerized mode (experimental)") glog.V(2).Info("Running kubelet in containerized mode (experimental)")
mounter = &mount.NsenterMounter{} mounter = &mount.NsenterMounter{}
} }
@ -347,7 +347,9 @@ func (s *KubeletServer) Run(_ []string) error {
MaxPods: s.MaxPods, MaxPods: s.MaxPods,
} }
RunKubelet(&kcfg, nil) if err := RunKubelet(&kcfg, nil); err != nil {
return err
}
if s.HealthzPort > 0 { if s.HealthzPort > 0 {
healthz.DefaultHealthz() healthz.DefaultHealthz()
@ -359,9 +361,12 @@ func (s *KubeletServer) Run(_ []string) error {
}, 5*time.Second) }, 5*time.Second)
} }
// runs forever if s.RunOnce {
select {} return nil
}
// run forever
select {}
} }
func (s *KubeletServer) authPathClientConfig(useDefaults bool) (*client.Config, error) { func (s *KubeletServer) authPathClientConfig(useDefaults bool) (*client.Config, error) {
@ -513,16 +518,16 @@ func SimpleKubelet(client *client.Client,
// 2 Kubelet binary // 2 Kubelet binary
// 3 Standalone 'kubernetes' binary // 3 Standalone 'kubernetes' binary
// Eventually, #2 will be replaced with instances of #3 // Eventually, #2 will be replaced with instances of #3
func RunKubelet(kcfg *KubeletConfig, builder KubeletBuilder) { func RunKubelet(kcfg *KubeletConfig, builder KubeletBuilder) error {
kcfg.Hostname = util.GetHostname(kcfg.HostnameOverride) kcfg.Hostname = util.GetHostname(kcfg.HostnameOverride)
eventBroadcaster := record.NewBroadcaster() eventBroadcaster := record.NewBroadcaster()
kcfg.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: "kubelet", Host: kcfg.Hostname}) kcfg.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: "kubelet", Host: kcfg.Hostname})
eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartLogging(glog.Infof)
if kcfg.KubeClient != nil { if kcfg.KubeClient != nil {
glog.Infof("Sending events to api server.") glog.V(4).Infof("Sending events to api server.")
eventBroadcaster.StartRecordingToSink(kcfg.KubeClient.Events("")) eventBroadcaster.StartRecordingToSink(kcfg.KubeClient.Events(""))
} else { } else {
glog.Infof("No api server defined - no events will be sent to API server.") glog.Warning("No api server defined - no events will be sent to API server.")
} }
capabilities.Setup(kcfg.AllowPrivileged, kcfg.HostNetworkSources) capabilities.Setup(kcfg.AllowPrivileged, kcfg.HostNetworkSources)
@ -536,19 +541,20 @@ func RunKubelet(kcfg *KubeletConfig, builder KubeletBuilder) {
} }
k, podCfg, err := builder(kcfg) k, podCfg, err := builder(kcfg)
if err != nil { if err != nil {
glog.Errorf("Failed to create kubelet: %s", err) return fmt.Errorf("failed to create kubelet: %v", err)
return
} }
// process pods and exit. // process pods and exit.
if kcfg.Runonce { if kcfg.Runonce {
if _, err := k.RunOnce(podCfg.Updates()); err != nil { if _, err := k.RunOnce(podCfg.Updates()); err != nil {
glog.Errorf("--runonce failed: %v", err) return fmt.Errorf("runonce failed: %v", err)
} }
glog.Infof("Started kubelet as runonce")
} else { } else {
startKubelet(k, podCfg, kcfg) startKubelet(k, podCfg, kcfg)
}
glog.Infof("Started kubelet") glog.Infof("Started kubelet")
} }
return nil
}
func startKubelet(k KubeletBootstrap, podCfg *config.PodConfig, kc *KubeletConfig) { func startKubelet(k KubeletBootstrap, podCfg *config.PodConfig, kc *KubeletConfig) {
// start the kubelet // start the kubelet

View File

@ -20,6 +20,7 @@ import (
"bytes" "bytes"
"fmt" "fmt"
"net/http" "net/http"
"sync"
) )
// HealthzChecker is a named healthz check. // HealthzChecker is a named healthz check.
@ -28,9 +29,13 @@ type HealthzChecker interface {
Check(req *http.Request) error Check(req *http.Request) error
} }
var defaultHealthz = sync.Once{}
// DefaultHealthz installs the default healthz check to the http.DefaultServeMux. // DefaultHealthz installs the default healthz check to the http.DefaultServeMux.
func DefaultHealthz(checks ...HealthzChecker) { func DefaultHealthz(checks ...HealthzChecker) {
defaultHealthz.Do(func() {
InstallHandler(http.DefaultServeMux, checks...) InstallHandler(http.DefaultServeMux, checks...)
})
} }
// PingHealthz returns true automatically when checked // PingHealthz returns true automatically when checked

View File

@ -277,7 +277,7 @@ func getDockerEndpoint(dockerEndpoint string) string {
func ConnectToDockerOrDie(dockerEndpoint string) DockerInterface { func ConnectToDockerOrDie(dockerEndpoint string) DockerInterface {
if dockerEndpoint == "fake://" { if dockerEndpoint == "fake://" {
return &FakeDockerClient{ return &FakeDockerClient{
VersionInfo: docker.Env{"ApiVersion=1.16"}, VersionInfo: docker.Env{"ApiVersion=1.18"},
} }
} }
client, err := docker.NewClient(getDockerEndpoint(dockerEndpoint)) client, err := docker.NewClient(getDockerEndpoint(dockerEndpoint))