Add timeout for all docker operation.

This commit is contained in:
Random-Liu 2016-05-06 10:24:56 -07:00
parent 16159b8bd0
commit 66678354a0

View File

@ -52,8 +52,13 @@ type kubeDockerClient struct {
// Make sure that kubeDockerClient implemented the DockerInterface.
var _ DockerInterface = &kubeDockerClient{}
// the default ShmSize to use (in bytes) if not specified.
const defaultShmSize = int64(1024 * 1024 * 64)
const (
// defaultTimeout is the default timeout of all docker operations.
defaultTimeout = 2 * time.Minute
// defaultShmSize is the default ShmSize to use (in bytes) if not specified.
defaultShmSize = int64(1024 * 1024 * 64)
)
// newKubeDockerClient creates an kubeDockerClient from an existing docker client.
func newKubeDockerClient(dockerClient *dockerapi.Client) DockerInterface {
@ -62,27 +67,26 @@ func newKubeDockerClient(dockerClient *dockerapi.Client) DockerInterface {
}
}
// getDefaultContext returns the default context, now the default context is
// context.Background()
// TODO(random-liu): Add timeout and timeout handling mechanism.
func getDefaultContext() context.Context {
return context.Background()
}
func (k *kubeDockerClient) ListContainers(options dockertypes.ContainerListOptions) ([]dockertypes.Container, error) {
containers, err := k.client.ContainerList(getDefaultContext(), options)
ctx, cancel := getDefaultContext()
defer cancel()
containers, err := k.client.ContainerList(ctx, options)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
if err != nil {
return nil, err
}
apiContainers := []dockertypes.Container{}
for _, c := range containers {
apiContainers = append(apiContainers, dockertypes.Container(c))
}
return apiContainers, nil
return containers, nil
}
func (d *kubeDockerClient) InspectContainer(id string) (*dockertypes.ContainerJSON, error) {
containerJSON, err := d.client.ContainerInspect(getDefaultContext(), id)
ctx, cancel := getDefaultContext()
defer cancel()
containerJSON, err := d.client.ContainerInspect(ctx, id)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
if err != nil {
if dockerapi.IsErrContainerNotFound(err) {
return nil, containerNotFoundError{ID: id}
@ -93,12 +97,17 @@ func (d *kubeDockerClient) InspectContainer(id string) (*dockertypes.ContainerJS
}
func (d *kubeDockerClient) CreateContainer(opts dockertypes.ContainerCreateConfig) (*dockertypes.ContainerCreateResponse, error) {
ctx, cancel := getDefaultContext()
defer cancel()
// we provide an explicit default shm size as to not depend on docker daemon.
// TODO: evaluate exposing this as a knob in the API
if opts.HostConfig != nil && opts.HostConfig.ShmSize <= 0 {
opts.HostConfig.ShmSize = defaultShmSize
}
createResp, err := d.client.ContainerCreate(getDefaultContext(), opts.Config, opts.HostConfig, opts.NetworkingConfig, opts.Name)
createResp, err := d.client.ContainerCreate(ctx, opts.Config, opts.HostConfig, opts.NetworkingConfig, opts.Name)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
if err != nil {
return nil, err
}
@ -106,20 +115,43 @@ func (d *kubeDockerClient) CreateContainer(opts dockertypes.ContainerCreateConfi
}
func (d *kubeDockerClient) StartContainer(id string) error {
return d.client.ContainerStart(getDefaultContext(), id)
ctx, cancel := getDefaultContext()
defer cancel()
err := d.client.ContainerStart(ctx, id)
if ctxErr := contextError(ctx); ctxErr != nil {
return ctxErr
}
return err
}
// Stopping an already stopped container will not cause an error in engine-api.
func (d *kubeDockerClient) StopContainer(id string, timeout int) error {
return d.client.ContainerStop(getDefaultContext(), id, timeout)
ctx, cancel := getDefaultContext()
defer cancel()
err := d.client.ContainerStop(ctx, id, timeout)
if ctxErr := contextError(ctx); ctxErr != nil {
return ctxErr
}
return err
}
func (d *kubeDockerClient) RemoveContainer(id string, opts dockertypes.ContainerRemoveOptions) error {
return d.client.ContainerRemove(getDefaultContext(), id, opts)
ctx, cancel := getDefaultContext()
defer cancel()
err := d.client.ContainerRemove(ctx, id, opts)
if ctxErr := contextError(ctx); ctxErr != nil {
return ctxErr
}
return err
}
func (d *kubeDockerClient) InspectImage(image string) (*dockertypes.ImageInspect, error) {
resp, _, err := d.client.ImageInspectWithRaw(getDefaultContext(), image, true)
ctx, cancel := getDefaultContext()
defer cancel()
resp, _, err := d.client.ImageInspectWithRaw(ctx, image, true)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
if err != nil {
if dockerapi.IsErrImageNotFound(err) {
err = imageNotFoundError{ID: image}
@ -130,11 +162,22 @@ func (d *kubeDockerClient) InspectImage(image string) (*dockertypes.ImageInspect
}
func (d *kubeDockerClient) ImageHistory(id string) ([]dockertypes.ImageHistory, error) {
return d.client.ImageHistory(getDefaultContext(), id)
ctx, cancel := getDefaultContext()
defer cancel()
resp, err := d.client.ImageHistory(ctx, id)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
return resp, err
}
func (d *kubeDockerClient) ListImages(opts dockertypes.ImageListOptions) ([]dockertypes.Image, error) {
images, err := d.client.ImageList(getDefaultContext(), opts)
ctx, cancel := getDefaultContext()
defer cancel()
images, err := d.client.ImageList(ctx, opts)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
if err != nil {
return nil, err
}
@ -155,8 +198,13 @@ func (d *kubeDockerClient) PullImage(image string, auth dockertypes.AuthConfig,
if err != nil {
return err
}
ctx, cancel := getDefaultContext()
defer cancel()
opts.RegistryAuth = base64Auth
resp, err := d.client.ImagePull(getDefaultContext(), image, opts)
resp, err := d.client.ImagePull(ctx, image, opts)
if ctxErr := contextError(ctx); ctxErr != nil {
return ctxErr
}
if err != nil {
return err
}
@ -180,11 +228,22 @@ func (d *kubeDockerClient) PullImage(image string, auth dockertypes.AuthConfig,
}
func (d *kubeDockerClient) RemoveImage(image string, opts dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDelete, error) {
return d.client.ImageRemove(getDefaultContext(), image, opts)
ctx, cancel := getDefaultContext()
defer cancel()
resp, err := d.client.ImageRemove(ctx, image, opts)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
return resp, err
}
func (d *kubeDockerClient) Logs(id string, opts dockertypes.ContainerLogsOptions, sopts StreamOptions) error {
resp, err := d.client.ContainerLogs(getDefaultContext(), id, opts)
ctx, cancel := getDefaultContext()
defer cancel()
resp, err := d.client.ContainerLogs(ctx, id, opts)
if ctxErr := contextError(ctx); ctxErr != nil {
return ctxErr
}
if err != nil {
return err
}
@ -193,7 +252,12 @@ func (d *kubeDockerClient) Logs(id string, opts dockertypes.ContainerLogsOptions
}
func (d *kubeDockerClient) Version() (*dockertypes.Version, error) {
resp, err := d.client.ServerVersion(getDefaultContext())
ctx, cancel := getDefaultContext()
defer cancel()
resp, err := d.client.ServerVersion(ctx)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
if err != nil {
return nil, err
}
@ -201,7 +265,12 @@ func (d *kubeDockerClient) Version() (*dockertypes.Version, error) {
}
func (d *kubeDockerClient) Info() (*dockertypes.Info, error) {
resp, err := d.client.Info(getDefaultContext())
ctx, cancel := getDefaultContext()
defer cancel()
resp, err := d.client.Info(ctx)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
if err != nil {
return nil, err
}
@ -210,7 +279,12 @@ func (d *kubeDockerClient) Info() (*dockertypes.Info, error) {
// TODO(random-liu): Add unit test for exec and attach functions, just like what go-dockerclient did.
func (d *kubeDockerClient) CreateExec(id string, opts dockertypes.ExecConfig) (*dockertypes.ContainerExecCreateResponse, error) {
resp, err := d.client.ContainerExecCreate(getDefaultContext(), id, opts)
ctx, cancel := getDefaultContext()
defer cancel()
resp, err := d.client.ContainerExecCreate(ctx, id, opts)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
if err != nil {
return nil, err
}
@ -218,13 +292,22 @@ func (d *kubeDockerClient) CreateExec(id string, opts dockertypes.ExecConfig) (*
}
func (d *kubeDockerClient) StartExec(startExec string, opts dockertypes.ExecStartCheck, sopts StreamOptions) error {
ctx, cancel := getDefaultContext()
defer cancel()
if opts.Detach {
return d.client.ContainerExecStart(getDefaultContext(), startExec, opts)
err := d.client.ContainerExecStart(ctx, startExec, opts)
if ctxErr := contextError(ctx); ctxErr != nil {
return ctxErr
}
return err
}
resp, err := d.client.ContainerExecAttach(getDefaultContext(), startExec, dockertypes.ExecConfig{
resp, err := d.client.ContainerExecAttach(ctx, startExec, dockertypes.ExecConfig{
Detach: opts.Detach,
Tty: opts.Tty,
})
if ctxErr := contextError(ctx); ctxErr != nil {
return ctxErr
}
if err != nil {
return err
}
@ -233,7 +316,12 @@ func (d *kubeDockerClient) StartExec(startExec string, opts dockertypes.ExecStar
}
func (d *kubeDockerClient) InspectExec(id string) (*dockertypes.ContainerExecInspect, error) {
resp, err := d.client.ContainerExecInspect(getDefaultContext(), id)
ctx, cancel := getDefaultContext()
defer cancel()
resp, err := d.client.ContainerExecInspect(ctx, id)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
if err != nil {
return nil, err
}
@ -241,7 +329,12 @@ func (d *kubeDockerClient) InspectExec(id string) (*dockertypes.ContainerExecIns
}
func (d *kubeDockerClient) AttachToContainer(id string, opts dockertypes.ContainerAttachOptions, sopts StreamOptions) error {
resp, err := d.client.ContainerAttach(getDefaultContext(), id, opts)
ctx, cancel := getDefaultContext()
defer cancel()
resp, err := d.client.ContainerAttach(ctx, id, opts)
if ctxErr := contextError(ctx); ctxErr != nil {
return ctxErr
}
if err != nil {
return err
}
@ -303,6 +396,18 @@ func parseDockerTimestamp(s string) (time.Time, error) {
return time.Parse(time.RFC3339Nano, s)
}
func getDefaultContext() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), defaultTimeout)
}
// contextError checks the context, and returns error if the context is timeout.
func contextError(ctx context.Context) error {
if ctx.Err() == context.DeadlineExceeded {
return operationTimeout{err: ctx.Err()}
}
return ctx.Err()
}
// StreamOptions are the options used to configure the stream redirection
type StreamOptions struct {
RawTerminal bool
@ -311,6 +416,15 @@ type StreamOptions struct {
ErrorStream io.Writer
}
// operationTimeout is the error returned when the docker operations are timeout.
type operationTimeout struct {
err error
}
func (e operationTimeout) Error() string {
return fmt.Sprintf("operation timeout: %v", e.err)
}
// containerNotFoundError is the error returned by InspectContainer when container not found. We
// add this error type for testability. We don't use the original error returned by engine-api
// because dockertypes.containerNotFoundError is private, we can't create and inject it in our test.
@ -319,7 +433,7 @@ type containerNotFoundError struct {
}
func (e containerNotFoundError) Error() string {
return fmt.Sprintf("Error: No such container: %s", e.ID)
return fmt.Sprintf("no such container: %q", e.ID)
}
// imageNotFoundError is the error returned by InspectImage when image not found.
@ -328,5 +442,5 @@ type imageNotFoundError struct {
}
func (e imageNotFoundError) Error() string {
return fmt.Sprintf("Error: No such image: %s", e.ID)
return fmt.Sprintf("no such image: %q", e.ID)
}