diff --git a/cmd/kubelet/app/options/options.go b/cmd/kubelet/app/options/options.go index 16bb74d4bb2..795bcd60639 100644 --- a/cmd/kubelet/app/options/options.go +++ b/cmd/kubelet/app/options/options.go @@ -85,6 +85,7 @@ func NewKubeletServer() *KubeletServer { CgroupRoot: "", ConfigureCBR0: false, ContainerRuntime: "docker", + RuntimeRequestTimeout: unversioned.Duration{Duration: 2 * time.Minute}, CPUCFSQuota: true, DockerExecHandlerName: "native", EventBurst: 10, @@ -227,6 +228,7 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.CgroupRoot, "cgroup-root", s.CgroupRoot, "Optional root cgroup to use for pods. This is handled by the container runtime on a best effort basis. Default: '', which means use the container runtime default.") fs.StringVar(&s.ContainerRuntime, "container-runtime", s.ContainerRuntime, "The container runtime to use. Possible values: 'docker', 'rkt'. Default: 'docker'.") + fs.DurationVar(&s.RuntimeRequestTimeout.Duration, "runtime-request-timeout", s.RuntimeRequestTimeout.Duration, "Timeout of all runtime requests except long running request - pull, logs, exec and attach. When timeout exceeded, kubelet will cancel the request, throw out an error and retry later. Default: 2m0s") fs.StringVar(&s.LockFilePath, "lock-file", s.LockFilePath, " The path to file for kubelet to use as a lock file.") fs.BoolVar(&s.ExitOnLockContention, "exit-on-lock-contention", s.ExitOnLockContention, "Whether kubelet should exit upon lock-file contention.") fs.StringVar(&s.RktPath, "rkt-path", s.RktPath, "Path of rkt binary. Leave empty to use the first rkt in $PATH. Only used if --container-runtime='rkt'.") diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index a27e7d1d4c1..72e69e311e0 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -212,7 +212,7 @@ func UnsecuredKubeletConfig(s *options.KubeletServer) (*KubeletConfig, error) { ContainerRuntime: s.ContainerRuntime, CPUCFSQuota: s.CPUCFSQuota, DiskSpacePolicy: diskSpacePolicy, - DockerClient: dockertools.ConnectToDockerOrDie(s.DockerEndpoint), + DockerClient: dockertools.ConnectToDockerOrDie(s.DockerEndpoint, s.RuntimeRequestTimeout.Duration), // TODO(random-liu): Set RuntimeRequestTimeout for rkt. RuntimeCgroups: s.RuntimeCgroups, DockerExecHandler: dockerExecHandler, EnableControllerAttachDetach: s.EnableControllerAttachDetach, diff --git a/contrib/mesos/pkg/executor/executor_test.go b/contrib/mesos/pkg/executor/executor_test.go index 095511be3fe..519904072e6 100644 --- a/contrib/mesos/pkg/executor/executor_test.go +++ b/contrib/mesos/pkg/executor/executor_test.go @@ -184,7 +184,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { mockDriver = &MockExecutorDriver{} registry = newFakeRegistry() executor = New(Config{ - Docker: dockertools.ConnectToDockerOrDie("fake://"), + Docker: dockertools.ConnectToDockerOrDie("fake://", 0), NodeInfos: make(chan NodeInfo, 1), Registry: registry, }) @@ -387,7 +387,7 @@ func TestExecutorFrameworkMessage(t *testing.T) { kubeletFinished = make(chan struct{}) registry = newFakeRegistry() executor = New(Config{ - Docker: dockertools.ConnectToDockerOrDie("fake://"), + Docker: dockertools.ConnectToDockerOrDie("fake://", 0), NodeInfos: make(chan NodeInfo, 1), ShutdownAlert: func() { close(kubeletFinished) @@ -584,7 +584,7 @@ func TestExecutorShutdown(t *testing.T) { kubeletFinished = make(chan struct{}) exitCalled = int32(0) executor = New(Config{ - Docker: dockertools.ConnectToDockerOrDie("fake://"), + Docker: dockertools.ConnectToDockerOrDie("fake://", 0), NodeInfos: make(chan NodeInfo, 1), ShutdownAlert: func() { close(kubeletFinished) diff --git a/contrib/mesos/pkg/executor/mock_test.go b/contrib/mesos/pkg/executor/mock_test.go index fe70393a0b8..1aec5ade132 100644 --- a/contrib/mesos/pkg/executor/mock_test.go +++ b/contrib/mesos/pkg/executor/mock_test.go @@ -75,7 +75,7 @@ func (m *MockExecutorDriver) SendFrameworkMessage(msg string) (mesosproto.Status func NewTestKubernetesExecutor() *Executor { return New(Config{ - Docker: dockertools.ConnectToDockerOrDie("fake://"), + Docker: dockertools.ConnectToDockerOrDie("fake://", 0), Registry: newFakeRegistry(), }) } diff --git a/contrib/mesos/pkg/executor/service/service.go b/contrib/mesos/pkg/executor/service/service.go index a8b336b2deb..03c2612c8a8 100644 --- a/contrib/mesos/pkg/executor/service/service.go +++ b/contrib/mesos/pkg/executor/service/service.go @@ -107,7 +107,7 @@ func (s *KubeletExecutorServer) runExecutor( exec := executor.New(executor.Config{ Registry: registry, APIClient: apiclient, - Docker: dockertools.ConnectToDockerOrDie(s.DockerEndpoint), + Docker: dockertools.ConnectToDockerOrDie(s.DockerEndpoint, 0), SuicideTimeout: s.SuicideTimeout, KubeletFinished: kubeletFinished, ExitFunc: os.Exit, diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index 721fa9bb3c6..d4223eb8ff5 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -398,6 +398,7 @@ root-dir run-proxy runtime-cgroups runtime-config +runtime-request-timeout save-config scheduler-config scheduler-name diff --git a/pkg/apis/componentconfig/types.go b/pkg/apis/componentconfig/types.go index 79619687512..d4fb8616e25 100644 --- a/pkg/apis/componentconfig/types.go +++ b/pkg/apis/componentconfig/types.go @@ -263,6 +263,9 @@ type KubeletConfiguration struct { CgroupRoot string `json:"cgroupRoot,omitempty"` // containerRuntime is the container runtime to use. ContainerRuntime string `json:"containerRuntime"` + // runtimeRequestTimeout is the timeout for all runtime requests except long running + // requests - pull, logs, exec and attach. + RuntimeRequestTimeout unversioned.Duration `json:"runtimeRequestTimeout,omitempty"` // rktPath is the path of rkt binary. Leave empty to use the first rkt in // $PATH. RktPath string `json:"rktPath,omitempty"` diff --git a/pkg/kubelet/dockertools/docker.go b/pkg/kubelet/dockertools/docker.go index d265db51250..be94cf5fe28 100644 --- a/pkg/kubelet/dockertools/docker.go +++ b/pkg/kubelet/dockertools/docker.go @@ -23,6 +23,7 @@ import ( "path" "strconv" "strings" + "time" dockerref "github.com/docker/distribution/reference" "github.com/docker/docker/pkg/jsonmessage" @@ -311,8 +312,11 @@ func getDockerClient(dockerEndpoint string) (*dockerapi.Client, error) { // ConnectToDockerOrDie creates docker client connecting to docker daemon. // If the endpoint passed in is "fake://", a fake docker client -// will be returned. The program exits if error occurs. -func ConnectToDockerOrDie(dockerEndpoint string) DockerInterface { +// will be returned. The program exits if error occurs. The requestTimeout +// is the timeout for docker requests. If timeout is exceeded, the request +// will be cancelled and throw out an error. If requestTimeout is 0, a default +// value will be applied. +func ConnectToDockerOrDie(dockerEndpoint string, requestTimeout time.Duration) DockerInterface { if dockerEndpoint == "fake://" { return NewFakeDockerClient() } @@ -320,7 +324,8 @@ func ConnectToDockerOrDie(dockerEndpoint string) DockerInterface { if err != nil { glog.Fatalf("Couldn't connect to docker: %v", err) } - return newKubeDockerClient(client) + glog.Infof("Start docker client with request timeout=%v", requestTimeout) + return newKubeDockerClient(client, requestTimeout) } // milliCPUToQuota converts milliCPU to CFS quota and period values diff --git a/pkg/kubelet/dockertools/kube_docker_client.go b/pkg/kubelet/dockertools/kube_docker_client.go index 013507f0560..d390eff1081 100644 --- a/pkg/kubelet/dockertools/kube_docker_client.go +++ b/pkg/kubelet/dockertools/kube_docker_client.go @@ -49,14 +49,22 @@ import ( // TODO(random-liu): Swith to new docker interface by refactoring the functions in the old DockerInterface // one by one. type kubeDockerClient struct { - client *dockerapi.Client + // timeout is the timeout of short running docker operations. + timeout time.Duration + client *dockerapi.Client } // Make sure that kubeDockerClient implemented the DockerInterface. var _ DockerInterface = &kubeDockerClient{} +// There are 2 kinds of docker operations categorized by running time: +// * Long running operation: The long running operation could run for arbitrary long time, and the running time +// usually depends on some uncontrollable factors. These operations include: PullImage, Logs, StartExec, AttachToContainer. +// * Non-long running operation: Given the maximum load of the system, the non-long running operation should finish +// in expected and usually short time. These include all other operations. +// kubeDockerClient only applies timeout on non-long running operations. const ( - // defaultTimeout is the default timeout of all docker operations. + // defaultTimeout is the default timeout of short running docker operations. defaultTimeout = 2 * time.Minute // defaultShmSize is the default ShmSize to use (in bytes) if not specified. @@ -69,20 +77,26 @@ const ( // is made for defaultImagePullingStuckTimeout, the image pulling will be cancelled. // Docker reports image progress for every 512kB block, so normally there shouldn't be too long interval // between progress updates. + // TODO(random-liu): Make this configurable defaultImagePullingStuckTimeout = 1 * time.Minute ) -// newKubeDockerClient creates an kubeDockerClient from an existing docker client. -func newKubeDockerClient(dockerClient *dockerapi.Client) DockerInterface { +// newKubeDockerClient creates an kubeDockerClient from an existing docker client. If requestTimeout is 0, +// defaultTimeout will be applied. +func newKubeDockerClient(dockerClient *dockerapi.Client, requestTimeout time.Duration) DockerInterface { + if requestTimeout == 0 { + requestTimeout = defaultTimeout + } return &kubeDockerClient{ - client: dockerClient, + client: dockerClient, + timeout: requestTimeout, } } -func (k *kubeDockerClient) ListContainers(options dockertypes.ContainerListOptions) ([]dockertypes.Container, error) { - ctx, cancel := getDefaultContext() +func (d *kubeDockerClient) ListContainers(options dockertypes.ContainerListOptions) ([]dockertypes.Container, error) { + ctx, cancel := d.getTimeoutContext() defer cancel() - containers, err := k.client.ContainerList(ctx, options) + containers, err := d.client.ContainerList(ctx, options) if ctxErr := contextError(ctx); ctxErr != nil { return nil, ctxErr } @@ -93,7 +107,7 @@ func (k *kubeDockerClient) ListContainers(options dockertypes.ContainerListOptio } func (d *kubeDockerClient) InspectContainer(id string) (*dockertypes.ContainerJSON, error) { - ctx, cancel := getDefaultContext() + ctx, cancel := d.getTimeoutContext() defer cancel() containerJSON, err := d.client.ContainerInspect(ctx, id) if ctxErr := contextError(ctx); ctxErr != nil { @@ -109,7 +123,7 @@ func (d *kubeDockerClient) InspectContainer(id string) (*dockertypes.ContainerJS } func (d *kubeDockerClient) CreateContainer(opts dockertypes.ContainerCreateConfig) (*dockertypes.ContainerCreateResponse, error) { - ctx, cancel := getDefaultContext() + ctx, cancel := d.getTimeoutContext() defer cancel() // we provide an explicit default shm size as to not depend on docker daemon. // TODO: evaluate exposing this as a knob in the API @@ -127,7 +141,7 @@ func (d *kubeDockerClient) CreateContainer(opts dockertypes.ContainerCreateConfi } func (d *kubeDockerClient) StartContainer(id string) error { - ctx, cancel := getDefaultContext() + ctx, cancel := d.getTimeoutContext() defer cancel() err := d.client.ContainerStart(ctx, id) if ctxErr := contextError(ctx); ctxErr != nil { @@ -138,7 +152,7 @@ func (d *kubeDockerClient) StartContainer(id string) error { // Stopping an already stopped container will not cause an error in engine-api. func (d *kubeDockerClient) StopContainer(id string, timeout int) error { - ctx, cancel := getDefaultContext() + ctx, cancel := d.getTimeoutContext() defer cancel() err := d.client.ContainerStop(ctx, id, timeout) if ctxErr := contextError(ctx); ctxErr != nil { @@ -148,7 +162,7 @@ func (d *kubeDockerClient) StopContainer(id string, timeout int) error { } func (d *kubeDockerClient) RemoveContainer(id string, opts dockertypes.ContainerRemoveOptions) error { - ctx, cancel := getDefaultContext() + ctx, cancel := d.getTimeoutContext() defer cancel() err := d.client.ContainerRemove(ctx, id, opts) if ctxErr := contextError(ctx); ctxErr != nil { @@ -158,7 +172,7 @@ func (d *kubeDockerClient) RemoveContainer(id string, opts dockertypes.Container } func (d *kubeDockerClient) InspectImage(image string) (*dockertypes.ImageInspect, error) { - ctx, cancel := getDefaultContext() + ctx, cancel := d.getTimeoutContext() defer cancel() resp, _, err := d.client.ImageInspectWithRaw(ctx, image, true) if ctxErr := contextError(ctx); ctxErr != nil { @@ -174,7 +188,7 @@ func (d *kubeDockerClient) InspectImage(image string) (*dockertypes.ImageInspect } func (d *kubeDockerClient) ImageHistory(id string) ([]dockertypes.ImageHistory, error) { - ctx, cancel := getDefaultContext() + ctx, cancel := d.getTimeoutContext() defer cancel() resp, err := d.client.ImageHistory(ctx, id) if ctxErr := contextError(ctx); ctxErr != nil { @@ -184,7 +198,7 @@ func (d *kubeDockerClient) ImageHistory(id string) ([]dockertypes.ImageHistory, } func (d *kubeDockerClient) ListImages(opts dockertypes.ImageListOptions) ([]dockertypes.Image, error) { - ctx, cancel := getDefaultContext() + ctx, cancel := d.getTimeoutContext() defer cancel() images, err := d.client.ImageList(ctx, opts) if ctxErr := contextError(ctx); ctxErr != nil { @@ -297,7 +311,7 @@ func (d *kubeDockerClient) PullImage(image string, auth dockertypes.AuthConfig, return err } opts.RegistryAuth = base64Auth - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := d.getCancelableContext() defer cancel() resp, err := d.client.ImagePull(ctx, image, opts) if err != nil { @@ -326,7 +340,7 @@ func (d *kubeDockerClient) PullImage(image string, auth dockertypes.AuthConfig, } func (d *kubeDockerClient) RemoveImage(image string, opts dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDelete, error) { - ctx, cancel := getDefaultContext() + ctx, cancel := d.getTimeoutContext() defer cancel() resp, err := d.client.ImageRemove(ctx, image, opts) if ctxErr := contextError(ctx); ctxErr != nil { @@ -336,8 +350,12 @@ func (d *kubeDockerClient) RemoveImage(image string, opts dockertypes.ImageRemov } func (d *kubeDockerClient) Logs(id string, opts dockertypes.ContainerLogsOptions, sopts StreamOptions) error { - // Don't set timeout for log calls - resp, err := d.client.ContainerLogs(context.Background(), id, opts) + ctx, cancel := d.getCancelableContext() + defer cancel() + resp, err := d.client.ContainerLogs(ctx, id, opts) + if ctxErr := contextError(ctx); ctxErr != nil { + return ctxErr + } if err != nil { return err } @@ -346,7 +364,7 @@ func (d *kubeDockerClient) Logs(id string, opts dockertypes.ContainerLogsOptions } func (d *kubeDockerClient) Version() (*dockertypes.Version, error) { - ctx, cancel := getDefaultContext() + ctx, cancel := d.getTimeoutContext() defer cancel() resp, err := d.client.ServerVersion(ctx) if ctxErr := contextError(ctx); ctxErr != nil { @@ -359,7 +377,7 @@ func (d *kubeDockerClient) Version() (*dockertypes.Version, error) { } func (d *kubeDockerClient) Info() (*dockertypes.Info, error) { - ctx, cancel := getDefaultContext() + ctx, cancel := d.getTimeoutContext() defer cancel() resp, err := d.client.Info(ctx) if ctxErr := contextError(ctx); ctxErr != nil { @@ -373,7 +391,7 @@ func (d *kubeDockerClient) Info() (*dockertypes.Info, error) { // TODO(random-liu): Add unit test for exec and attach functions, just like what go-dockerclient did. func (d *kubeDockerClient) CreateExec(id string, opts dockertypes.ExecConfig) (*dockertypes.ContainerExecCreateResponse, error) { - ctx, cancel := getDefaultContext() + ctx, cancel := d.getTimeoutContext() defer cancel() resp, err := d.client.ContainerExecCreate(ctx, id, opts) if ctxErr := contextError(ctx); ctxErr != nil { @@ -386,7 +404,7 @@ func (d *kubeDockerClient) CreateExec(id string, opts dockertypes.ExecConfig) (* } func (d *kubeDockerClient) StartExec(startExec string, opts dockertypes.ExecStartCheck, sopts StreamOptions) error { - ctx, cancel := getDefaultContext() + ctx, cancel := d.getCancelableContext() defer cancel() if opts.Detach { err := d.client.ContainerExecStart(ctx, startExec, opts) @@ -410,7 +428,7 @@ func (d *kubeDockerClient) StartExec(startExec string, opts dockertypes.ExecStar } func (d *kubeDockerClient) InspectExec(id string) (*dockertypes.ContainerExecInspect, error) { - ctx, cancel := getDefaultContext() + ctx, cancel := d.getTimeoutContext() defer cancel() resp, err := d.client.ContainerExecInspect(ctx, id) if ctxErr := contextError(ctx); ctxErr != nil { @@ -423,7 +441,7 @@ func (d *kubeDockerClient) InspectExec(id string) (*dockertypes.ContainerExecIns } func (d *kubeDockerClient) AttachToContainer(id string, opts dockertypes.ContainerAttachOptions, sopts StreamOptions) error { - ctx, cancel := getDefaultContext() + ctx, cancel := d.getCancelableContext() defer cancel() resp, err := d.client.ContainerAttach(ctx, id, opts) if ctxErr := contextError(ctx); ctxErr != nil { @@ -484,16 +502,23 @@ func (d *kubeDockerClient) holdHijackedConnection(tty bool, inputStream io.Reade return nil } +// getCancelableContext returns a new cancelable context. For long running requests without timeout, we use cancelable +// context to avoid potential resource leak, although the current implementation shouldn't leak resource. +func (d *kubeDockerClient) getCancelableContext() (context.Context, context.CancelFunc) { + return context.WithCancel(context.Background()) +} + +// getTimeoutContext returns a new context with default request timeout +func (d *kubeDockerClient) getTimeoutContext() (context.Context, context.CancelFunc) { + return context.WithTimeout(context.Background(), d.timeout) +} + // parseDockerTimestamp parses the timestamp returned by DockerInterface from string to time.Time func parseDockerTimestamp(s string) (time.Time, error) { // Timestamp returned by Docker is in time.RFC3339Nano format. return time.Parse(time.RFC3339Nano, s) } -func getDefaultContext() (context.Context, context.CancelFunc) { - return context.WithTimeout(context.Background(), defaultTimeout) -} - // contextError checks the context, and returns error if the context is timeout. func contextError(ctx context.Context) error { if ctx.Err() == context.DeadlineExceeded { diff --git a/test/e2e_node/image.go b/test/e2e_node/image.go index 8a47e5406ff..dfbbac5e78e 100644 --- a/test/e2e_node/image.go +++ b/test/e2e_node/image.go @@ -40,7 +40,7 @@ func NewConformanceImage(containerRuntime string, image string) (ci ConformanceI //TODO: do not expose kubelet implementation details after we refactor the runtime API. func dockerRuntime() kubecontainer.Runtime { - dockerClient := dockertools.ConnectToDockerOrDie("") + dockerClient := dockertools.ConnectToDockerOrDie("", 0) pm := kubepod.NewBasicPodManager(nil) dm := dockertools.NewDockerManager( dockerClient,