Merge pull request #7729 from yujuhong/runtime_up

Kubelet: wait until container runtime is up
This commit is contained in:
Victor Marmol 2015-05-04 16:02:30 -07:00
commit a32d31d045
4 changed files with 26 additions and 19 deletions

View File

@ -60,6 +60,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler" "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler"
_ "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/algorithmprovider" _ "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/algorithmprovider"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory" "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory"
docker "github.com/fsouza/go-dockerclient"
"github.com/coreos/go-etcd/etcd" "github.com/coreos/go-etcd/etcd"
"github.com/golang/glog" "github.com/golang/glog"
@ -232,6 +233,7 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st
testRootDir := makeTempDirOrDie("kubelet_integ_1.", "") testRootDir := makeTempDirOrDie("kubelet_integ_1.", "")
configFilePath := makeTempDirOrDie("config", testRootDir) configFilePath := makeTempDirOrDie("config", testRootDir)
glog.Infof("Using %s as root dir for kubelet #1", testRootDir) glog.Infof("Using %s as root dir for kubelet #1", testRootDir)
fakeDocker1.VersionInfo = docker.Env{"ApiVersion=1.15"}
kcfg := kubeletapp.SimpleKubelet(cl, &fakeDocker1, machineList[0], testRootDir, firstManifestURL, "127.0.0.1", 10250, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface, configFilePath, nil, kubecontainer.FakeOS{}) kcfg := kubeletapp.SimpleKubelet(cl, &fakeDocker1, machineList[0], testRootDir, firstManifestURL, "127.0.0.1", 10250, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface, configFilePath, nil, kubecontainer.FakeOS{})
kubeletapp.RunKubelet(kcfg, nil) kubeletapp.RunKubelet(kcfg, nil)
// Kubelet (machine) // Kubelet (machine)
@ -239,6 +241,7 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st
// have a place they can schedule. // have a place they can schedule.
testRootDir = makeTempDirOrDie("kubelet_integ_2.", "") testRootDir = makeTempDirOrDie("kubelet_integ_2.", "")
glog.Infof("Using %s as root dir for kubelet #2", testRootDir) glog.Infof("Using %s as root dir for kubelet #2", testRootDir)
fakeDocker2.VersionInfo = docker.Env{"ApiVersion=1.15"}
kcfg = kubeletapp.SimpleKubelet(cl, &fakeDocker2, machineList[1], testRootDir, secondManifestURL, "127.0.0.1", 10251, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface, "", nil, kubecontainer.FakeOS{}) kcfg = kubeletapp.SimpleKubelet(cl, &fakeDocker2, machineList[1], testRootDir, secondManifestURL, "127.0.0.1", 10251, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface, "", nil, kubecontainer.FakeOS{})
kubeletapp.RunKubelet(kcfg, nil) kubeletapp.RunKubelet(kcfg, nil)
return apiServer.URL, configFilePath return apiServer.URL, configFilePath

View File

@ -263,7 +263,7 @@ func getDockerEndpoint(dockerEndpoint string) string {
func ConnectToDockerOrDie(dockerEndpoint string) DockerInterface { func ConnectToDockerOrDie(dockerEndpoint string) DockerInterface {
if dockerEndpoint == "fake://" { if dockerEndpoint == "fake://" {
return &FakeDockerClient{ return &FakeDockerClient{
VersionInfo: []string{"apiVersion=1.16"}, VersionInfo: docker.Env{"ApiVersion=1.16"},
} }
} }
client, err := docker.NewClient(getDockerEndpoint(dockerEndpoint)) client, err := docker.NewClient(getDockerEndpoint(dockerEndpoint))

View File

@ -844,6 +844,7 @@ func (dm *DockerManager) Version() (kubecontainer.Version, error) {
apiVersion := env.Get("ApiVersion") apiVersion := env.Get("ApiVersion")
version, err := docker.NewAPIVersion(apiVersion) version, err := docker.NewAPIVersion(apiVersion)
if err != nil { if err != nil {
glog.Errorf("docker: failed to parse docker server version %q: %v", apiVersion, err)
return nil, fmt.Errorf("docker: failed to parse docker server version %q: %v", apiVersion, err) return nil, fmt.Errorf("docker: failed to parse docker server version %q: %v", apiVersion, err)
} }
return dockerVersion(version), nil return dockerVersion(version), nil

View File

@ -60,8 +60,8 @@ import (
) )
const ( const (
// Max amount of time to wait for the Docker daemon to come up. // Max amount of time to wait for the container runtime to come up.
maxWaitForDocker = 5 * time.Minute maxWaitForContainerRuntime = 5 * time.Minute
// Initial node status update frequency and incremental frequency, for faster cluster startup. // Initial node status update frequency and incremental frequency, for faster cluster startup.
// The update frequency will be increameted linearly, until it reaches status_update_frequency. // The update frequency will be increameted linearly, until it reaches status_update_frequency.
@ -95,6 +95,20 @@ type SourcesReadyFn func() bool
type volumeMap map[string]volume.Volume type volumeMap map[string]volume.Volume
// Wait for the container runtime to be up with a timeout.
func waitUntilRuntimeIsUp(cr kubecontainer.Runtime, timeout time.Duration) error {
var err error = nil
waitStart := time.Now()
for time.Since(waitStart) < timeout {
_, err = cr.Version()
if err == nil {
return nil
}
time.Sleep(100 * time.Millisecond)
}
return err
}
// New creates a new Kubelet for use in main // New creates a new Kubelet for use in main
func NewMainKubelet( func NewMainKubelet(
hostname string, hostname string,
@ -132,22 +146,6 @@ func NewMainKubelet(
} }
dockerClient = dockertools.NewInstrumentedDockerInterface(dockerClient) dockerClient = dockertools.NewInstrumentedDockerInterface(dockerClient)
// Wait for the Docker daemon to be up (with a timeout).
waitStart := time.Now()
dockerUp := false
for time.Since(waitStart) < maxWaitForDocker {
_, err := dockerClient.Version()
if err == nil {
dockerUp = true
break
}
time.Sleep(100 * time.Millisecond)
}
if !dockerUp {
return nil, fmt.Errorf("timed out waiting for Docker to come up")
}
serviceStore := cache.NewStore(cache.MetaNamespaceKeyFunc) serviceStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
if kubeClient != nil { if kubeClient != nil {
// TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather // TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather
@ -267,6 +265,11 @@ func NewMainKubelet(
return nil, fmt.Errorf("unsupported container runtime %q specified", containerRuntime) return nil, fmt.Errorf("unsupported container runtime %q specified", containerRuntime)
} }
// Wait for the runtime to be up with a timeout.
if err := waitUntilRuntimeIsUp(klet.containerRuntime, maxWaitForContainerRuntime); err != nil {
return nil, fmt.Errorf("timed out waiting for %q to come up: %v", containerRuntime, err)
}
klet.runner = klet.containerRuntime klet.runner = klet.containerRuntime
klet.podManager = newBasicPodManager(klet.kubeClient) klet.podManager = newBasicPodManager(klet.kubeClient)