diff --git a/pkg/kubelet/dockertools/kube_docker_client.go b/pkg/kubelet/dockertools/kube_docker_client.go index 76bb89e548e..6e440b25279 100644 --- a/pkg/kubelet/dockertools/kube_docker_client.go +++ b/pkg/kubelet/dockertools/kube_docker_client.go @@ -52,8 +52,13 @@ type kubeDockerClient struct { // Make sure that kubeDockerClient implemented the DockerInterface. var _ DockerInterface = &kubeDockerClient{} -// the default ShmSize to use (in bytes) if not specified. -const defaultShmSize = int64(1024 * 1024 * 64) +const ( + // defaultTimeout is the default timeout of all docker operations. + defaultTimeout = 2 * time.Minute + + // defaultShmSize is the default ShmSize to use (in bytes) if not specified. + defaultShmSize = int64(1024 * 1024 * 64) +) // newKubeDockerClient creates an kubeDockerClient from an existing docker client. func newKubeDockerClient(dockerClient *dockerapi.Client) DockerInterface { @@ -62,27 +67,26 @@ func newKubeDockerClient(dockerClient *dockerapi.Client) DockerInterface { } } -// getDefaultContext returns the default context, now the default context is -// context.Background() -// TODO(random-liu): Add timeout and timeout handling mechanism. -func getDefaultContext() context.Context { - return context.Background() -} - func (k *kubeDockerClient) ListContainers(options dockertypes.ContainerListOptions) ([]dockertypes.Container, error) { - containers, err := k.client.ContainerList(getDefaultContext(), options) + ctx, cancel := getDefaultContext() + defer cancel() + containers, err := k.client.ContainerList(ctx, options) + if ctxErr := contextError(ctx); ctxErr != nil { + return nil, ctxErr + } if err != nil { return nil, err } - apiContainers := []dockertypes.Container{} - for _, c := range containers { - apiContainers = append(apiContainers, dockertypes.Container(c)) - } - return apiContainers, nil + return containers, nil } func (d *kubeDockerClient) InspectContainer(id string) (*dockertypes.ContainerJSON, error) { - containerJSON, err := d.client.ContainerInspect(getDefaultContext(), id) + ctx, cancel := getDefaultContext() + defer cancel() + containerJSON, err := d.client.ContainerInspect(ctx, id) + if ctxErr := contextError(ctx); ctxErr != nil { + return nil, ctxErr + } if err != nil { if dockerapi.IsErrContainerNotFound(err) { return nil, containerNotFoundError{ID: id} @@ -93,12 +97,17 @@ func (d *kubeDockerClient) InspectContainer(id string) (*dockertypes.ContainerJS } func (d *kubeDockerClient) CreateContainer(opts dockertypes.ContainerCreateConfig) (*dockertypes.ContainerCreateResponse, error) { + ctx, cancel := getDefaultContext() + defer cancel() // we provide an explicit default shm size as to not depend on docker daemon. // TODO: evaluate exposing this as a knob in the API if opts.HostConfig != nil && opts.HostConfig.ShmSize <= 0 { opts.HostConfig.ShmSize = defaultShmSize } - createResp, err := d.client.ContainerCreate(getDefaultContext(), opts.Config, opts.HostConfig, opts.NetworkingConfig, opts.Name) + createResp, err := d.client.ContainerCreate(ctx, opts.Config, opts.HostConfig, opts.NetworkingConfig, opts.Name) + if ctxErr := contextError(ctx); ctxErr != nil { + return nil, ctxErr + } if err != nil { return nil, err } @@ -106,20 +115,43 @@ func (d *kubeDockerClient) CreateContainer(opts dockertypes.ContainerCreateConfi } func (d *kubeDockerClient) StartContainer(id string) error { - return d.client.ContainerStart(getDefaultContext(), id) + ctx, cancel := getDefaultContext() + defer cancel() + err := d.client.ContainerStart(ctx, id) + if ctxErr := contextError(ctx); ctxErr != nil { + return ctxErr + } + return err } // Stopping an already stopped container will not cause an error in engine-api. func (d *kubeDockerClient) StopContainer(id string, timeout int) error { - return d.client.ContainerStop(getDefaultContext(), id, timeout) + ctx, cancel := getDefaultContext() + defer cancel() + err := d.client.ContainerStop(ctx, id, timeout) + if ctxErr := contextError(ctx); ctxErr != nil { + return ctxErr + } + return err } func (d *kubeDockerClient) RemoveContainer(id string, opts dockertypes.ContainerRemoveOptions) error { - return d.client.ContainerRemove(getDefaultContext(), id, opts) + ctx, cancel := getDefaultContext() + defer cancel() + err := d.client.ContainerRemove(ctx, id, opts) + if ctxErr := contextError(ctx); ctxErr != nil { + return ctxErr + } + return err } func (d *kubeDockerClient) InspectImage(image string) (*dockertypes.ImageInspect, error) { - resp, _, err := d.client.ImageInspectWithRaw(getDefaultContext(), image, true) + ctx, cancel := getDefaultContext() + defer cancel() + resp, _, err := d.client.ImageInspectWithRaw(ctx, image, true) + if ctxErr := contextError(ctx); ctxErr != nil { + return nil, ctxErr + } if err != nil { if dockerapi.IsErrImageNotFound(err) { err = imageNotFoundError{ID: image} @@ -130,11 +162,22 @@ func (d *kubeDockerClient) InspectImage(image string) (*dockertypes.ImageInspect } func (d *kubeDockerClient) ImageHistory(id string) ([]dockertypes.ImageHistory, error) { - return d.client.ImageHistory(getDefaultContext(), id) + ctx, cancel := getDefaultContext() + defer cancel() + resp, err := d.client.ImageHistory(ctx, id) + if ctxErr := contextError(ctx); ctxErr != nil { + return nil, ctxErr + } + return resp, err } func (d *kubeDockerClient) ListImages(opts dockertypes.ImageListOptions) ([]dockertypes.Image, error) { - images, err := d.client.ImageList(getDefaultContext(), opts) + ctx, cancel := getDefaultContext() + defer cancel() + images, err := d.client.ImageList(ctx, opts) + if ctxErr := contextError(ctx); ctxErr != nil { + return nil, ctxErr + } if err != nil { return nil, err } @@ -155,8 +198,13 @@ func (d *kubeDockerClient) PullImage(image string, auth dockertypes.AuthConfig, if err != nil { return err } + ctx, cancel := getDefaultContext() + defer cancel() opts.RegistryAuth = base64Auth - resp, err := d.client.ImagePull(getDefaultContext(), image, opts) + resp, err := d.client.ImagePull(ctx, image, opts) + if ctxErr := contextError(ctx); ctxErr != nil { + return ctxErr + } if err != nil { return err } @@ -180,11 +228,22 @@ func (d *kubeDockerClient) PullImage(image string, auth dockertypes.AuthConfig, } func (d *kubeDockerClient) RemoveImage(image string, opts dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDelete, error) { - return d.client.ImageRemove(getDefaultContext(), image, opts) + ctx, cancel := getDefaultContext() + defer cancel() + resp, err := d.client.ImageRemove(ctx, image, opts) + if ctxErr := contextError(ctx); ctxErr != nil { + return nil, ctxErr + } + return resp, err } func (d *kubeDockerClient) Logs(id string, opts dockertypes.ContainerLogsOptions, sopts StreamOptions) error { - resp, err := d.client.ContainerLogs(getDefaultContext(), id, opts) + ctx, cancel := getDefaultContext() + defer cancel() + resp, err := d.client.ContainerLogs(ctx, id, opts) + if ctxErr := contextError(ctx); ctxErr != nil { + return ctxErr + } if err != nil { return err } @@ -193,7 +252,12 @@ func (d *kubeDockerClient) Logs(id string, opts dockertypes.ContainerLogsOptions } func (d *kubeDockerClient) Version() (*dockertypes.Version, error) { - resp, err := d.client.ServerVersion(getDefaultContext()) + ctx, cancel := getDefaultContext() + defer cancel() + resp, err := d.client.ServerVersion(ctx) + if ctxErr := contextError(ctx); ctxErr != nil { + return nil, ctxErr + } if err != nil { return nil, err } @@ -201,7 +265,12 @@ func (d *kubeDockerClient) Version() (*dockertypes.Version, error) { } func (d *kubeDockerClient) Info() (*dockertypes.Info, error) { - resp, err := d.client.Info(getDefaultContext()) + ctx, cancel := getDefaultContext() + defer cancel() + resp, err := d.client.Info(ctx) + if ctxErr := contextError(ctx); ctxErr != nil { + return nil, ctxErr + } if err != nil { return nil, err } @@ -210,7 +279,12 @@ func (d *kubeDockerClient) Info() (*dockertypes.Info, error) { // TODO(random-liu): Add unit test for exec and attach functions, just like what go-dockerclient did. func (d *kubeDockerClient) CreateExec(id string, opts dockertypes.ExecConfig) (*dockertypes.ContainerExecCreateResponse, error) { - resp, err := d.client.ContainerExecCreate(getDefaultContext(), id, opts) + ctx, cancel := getDefaultContext() + defer cancel() + resp, err := d.client.ContainerExecCreate(ctx, id, opts) + if ctxErr := contextError(ctx); ctxErr != nil { + return nil, ctxErr + } if err != nil { return nil, err } @@ -218,13 +292,22 @@ func (d *kubeDockerClient) CreateExec(id string, opts dockertypes.ExecConfig) (* } func (d *kubeDockerClient) StartExec(startExec string, opts dockertypes.ExecStartCheck, sopts StreamOptions) error { + ctx, cancel := getDefaultContext() + defer cancel() if opts.Detach { - return d.client.ContainerExecStart(getDefaultContext(), startExec, opts) + err := d.client.ContainerExecStart(ctx, startExec, opts) + if ctxErr := contextError(ctx); ctxErr != nil { + return ctxErr + } + return err } - resp, err := d.client.ContainerExecAttach(getDefaultContext(), startExec, dockertypes.ExecConfig{ + resp, err := d.client.ContainerExecAttach(ctx, startExec, dockertypes.ExecConfig{ Detach: opts.Detach, Tty: opts.Tty, }) + if ctxErr := contextError(ctx); ctxErr != nil { + return ctxErr + } if err != nil { return err } @@ -233,7 +316,12 @@ func (d *kubeDockerClient) StartExec(startExec string, opts dockertypes.ExecStar } func (d *kubeDockerClient) InspectExec(id string) (*dockertypes.ContainerExecInspect, error) { - resp, err := d.client.ContainerExecInspect(getDefaultContext(), id) + ctx, cancel := getDefaultContext() + defer cancel() + resp, err := d.client.ContainerExecInspect(ctx, id) + if ctxErr := contextError(ctx); ctxErr != nil { + return nil, ctxErr + } if err != nil { return nil, err } @@ -241,7 +329,12 @@ func (d *kubeDockerClient) InspectExec(id string) (*dockertypes.ContainerExecIns } func (d *kubeDockerClient) AttachToContainer(id string, opts dockertypes.ContainerAttachOptions, sopts StreamOptions) error { - resp, err := d.client.ContainerAttach(getDefaultContext(), id, opts) + ctx, cancel := getDefaultContext() + defer cancel() + resp, err := d.client.ContainerAttach(ctx, id, opts) + if ctxErr := contextError(ctx); ctxErr != nil { + return ctxErr + } if err != nil { return err } @@ -303,6 +396,18 @@ func parseDockerTimestamp(s string) (time.Time, error) { return time.Parse(time.RFC3339Nano, s) } +func getDefaultContext() (context.Context, context.CancelFunc) { + return context.WithTimeout(context.Background(), defaultTimeout) +} + +// contextError checks the context, and returns error if the context is timeout. +func contextError(ctx context.Context) error { + if ctx.Err() == context.DeadlineExceeded { + return operationTimeout{err: ctx.Err()} + } + return ctx.Err() +} + // StreamOptions are the options used to configure the stream redirection type StreamOptions struct { RawTerminal bool @@ -311,6 +416,15 @@ type StreamOptions struct { ErrorStream io.Writer } +// operationTimeout is the error returned when the docker operations are timeout. +type operationTimeout struct { + err error +} + +func (e operationTimeout) Error() string { + return fmt.Sprintf("operation timeout: %v", e.err) +} + // containerNotFoundError is the error returned by InspectContainer when container not found. We // add this error type for testability. We don't use the original error returned by engine-api // because dockertypes.containerNotFoundError is private, we can't create and inject it in our test. @@ -319,7 +433,7 @@ type containerNotFoundError struct { } func (e containerNotFoundError) Error() string { - return fmt.Sprintf("Error: No such container: %s", e.ID) + return fmt.Sprintf("no such container: %q", e.ID) } // imageNotFoundError is the error returned by InspectImage when image not found. @@ -328,5 +442,5 @@ type imageNotFoundError struct { } func (e imageNotFoundError) Error() string { - return fmt.Sprintf("Error: No such image: %s", e.ID) + return fmt.Sprintf("no such image: %q", e.ID) }