Add runtime-request-timeout kubelet flag.

This commit is contained in:
Random-Liu 2016-06-16 14:48:36 -07:00
parent 04fd079d09
commit 52ebd4ecf1
10 changed files with 76 additions and 40 deletions

View File

@ -85,6 +85,7 @@ func NewKubeletServer() *KubeletServer {
CgroupRoot: "",
ConfigureCBR0: false,
ContainerRuntime: "docker",
RuntimeRequestTimeout: unversioned.Duration{Duration: 2 * time.Minute},
CPUCFSQuota: true,
DockerExecHandlerName: "native",
EventBurst: 10,
@ -227,6 +228,7 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.CgroupRoot, "cgroup-root", s.CgroupRoot, "Optional root cgroup to use for pods. This is handled by the container runtime on a best effort basis. Default: '', which means use the container runtime default.")
fs.StringVar(&s.ContainerRuntime, "container-runtime", s.ContainerRuntime, "The container runtime to use. Possible values: 'docker', 'rkt'. Default: 'docker'.")
fs.DurationVar(&s.RuntimeRequestTimeout.Duration, "runtime-request-timeout", s.RuntimeRequestTimeout.Duration, "Timeout of all runtime requests except long running request - pull, logs, exec and attach. When timeout exceeded, kubelet will cancel the request, throw out an error and retry later. Default: 2m0s")
fs.StringVar(&s.LockFilePath, "lock-file", s.LockFilePath, "<Warning: Alpha feature> The path to file for kubelet to use as a lock file.")
fs.BoolVar(&s.ExitOnLockContention, "exit-on-lock-contention", s.ExitOnLockContention, "Whether kubelet should exit upon lock-file contention.")
fs.StringVar(&s.RktPath, "rkt-path", s.RktPath, "Path of rkt binary. Leave empty to use the first rkt in $PATH. Only used if --container-runtime='rkt'.")

View File

@ -212,7 +212,7 @@ func UnsecuredKubeletConfig(s *options.KubeletServer) (*KubeletConfig, error) {
ContainerRuntime: s.ContainerRuntime,
CPUCFSQuota: s.CPUCFSQuota,
DiskSpacePolicy: diskSpacePolicy,
DockerClient: dockertools.ConnectToDockerOrDie(s.DockerEndpoint),
DockerClient: dockertools.ConnectToDockerOrDie(s.DockerEndpoint, s.RuntimeRequestTimeout.Duration), // TODO(random-liu): Set RuntimeRequestTimeout for rkt.
RuntimeCgroups: s.RuntimeCgroups,
DockerExecHandler: dockerExecHandler,
EnableControllerAttachDetach: s.EnableControllerAttachDetach,

View File

@ -184,7 +184,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) {
mockDriver = &MockExecutorDriver{}
registry = newFakeRegistry()
executor = New(Config{
Docker: dockertools.ConnectToDockerOrDie("fake://"),
Docker: dockertools.ConnectToDockerOrDie("fake://", 0),
NodeInfos: make(chan NodeInfo, 1),
Registry: registry,
})
@ -387,7 +387,7 @@ func TestExecutorFrameworkMessage(t *testing.T) {
kubeletFinished = make(chan struct{})
registry = newFakeRegistry()
executor = New(Config{
Docker: dockertools.ConnectToDockerOrDie("fake://"),
Docker: dockertools.ConnectToDockerOrDie("fake://", 0),
NodeInfos: make(chan NodeInfo, 1),
ShutdownAlert: func() {
close(kubeletFinished)
@ -584,7 +584,7 @@ func TestExecutorShutdown(t *testing.T) {
kubeletFinished = make(chan struct{})
exitCalled = int32(0)
executor = New(Config{
Docker: dockertools.ConnectToDockerOrDie("fake://"),
Docker: dockertools.ConnectToDockerOrDie("fake://", 0),
NodeInfos: make(chan NodeInfo, 1),
ShutdownAlert: func() {
close(kubeletFinished)

View File

@ -75,7 +75,7 @@ func (m *MockExecutorDriver) SendFrameworkMessage(msg string) (mesosproto.Status
func NewTestKubernetesExecutor() *Executor {
return New(Config{
Docker: dockertools.ConnectToDockerOrDie("fake://"),
Docker: dockertools.ConnectToDockerOrDie("fake://", 0),
Registry: newFakeRegistry(),
})
}

View File

@ -107,7 +107,7 @@ func (s *KubeletExecutorServer) runExecutor(
exec := executor.New(executor.Config{
Registry: registry,
APIClient: apiclient,
Docker: dockertools.ConnectToDockerOrDie(s.DockerEndpoint),
Docker: dockertools.ConnectToDockerOrDie(s.DockerEndpoint, 0),
SuicideTimeout: s.SuicideTimeout,
KubeletFinished: kubeletFinished,
ExitFunc: os.Exit,

View File

@ -398,6 +398,7 @@ root-dir
run-proxy
runtime-cgroups
runtime-config
runtime-request-timeout
save-config
scheduler-config
scheduler-name

View File

@ -263,6 +263,9 @@ type KubeletConfiguration struct {
CgroupRoot string `json:"cgroupRoot,omitempty"`
// containerRuntime is the container runtime to use.
ContainerRuntime string `json:"containerRuntime"`
// runtimeRequestTimeout is the timeout for all runtime requests except long running
// requests - pull, logs, exec and attach.
RuntimeRequestTimeout unversioned.Duration `json:"runtimeRequestTimeout,omitempty"`
// rktPath is the path of rkt binary. Leave empty to use the first rkt in
// $PATH.
RktPath string `json:"rktPath,omitempty"`

View File

@ -23,6 +23,7 @@ import (
"path"
"strconv"
"strings"
"time"
dockerref "github.com/docker/distribution/reference"
"github.com/docker/docker/pkg/jsonmessage"
@ -311,8 +312,11 @@ func getDockerClient(dockerEndpoint string) (*dockerapi.Client, error) {
// ConnectToDockerOrDie creates docker client connecting to docker daemon.
// If the endpoint passed in is "fake://", a fake docker client
// will be returned. The program exits if error occurs.
func ConnectToDockerOrDie(dockerEndpoint string) DockerInterface {
// will be returned. The program exits if error occurs. The requestTimeout
// is the timeout for docker requests. If timeout is exceeded, the request
// will be cancelled and throw out an error. If requestTimeout is 0, a default
// value will be applied.
func ConnectToDockerOrDie(dockerEndpoint string, requestTimeout time.Duration) DockerInterface {
if dockerEndpoint == "fake://" {
return NewFakeDockerClient()
}
@ -320,7 +324,8 @@ func ConnectToDockerOrDie(dockerEndpoint string) DockerInterface {
if err != nil {
glog.Fatalf("Couldn't connect to docker: %v", err)
}
return newKubeDockerClient(client)
glog.Infof("Start docker client with request timeout=%v", requestTimeout)
return newKubeDockerClient(client, requestTimeout)
}
// milliCPUToQuota converts milliCPU to CFS quota and period values

View File

@ -49,14 +49,22 @@ import (
// TODO(random-liu): Swith to new docker interface by refactoring the functions in the old DockerInterface
// one by one.
type kubeDockerClient struct {
client *dockerapi.Client
// timeout is the timeout of short running docker operations.
timeout time.Duration
client *dockerapi.Client
}
// Make sure that kubeDockerClient implemented the DockerInterface.
var _ DockerInterface = &kubeDockerClient{}
// There are 2 kinds of docker operations categorized by running time:
// * Long running operation: The long running operation could run for arbitrary long time, and the running time
// usually depends on some uncontrollable factors. These operations include: PullImage, Logs, StartExec, AttachToContainer.
// * Non-long running operation: Given the maximum load of the system, the non-long running operation should finish
// in expected and usually short time. These include all other operations.
// kubeDockerClient only applies timeout on non-long running operations.
const (
// defaultTimeout is the default timeout of all docker operations.
// defaultTimeout is the default timeout of short running docker operations.
defaultTimeout = 2 * time.Minute
// defaultShmSize is the default ShmSize to use (in bytes) if not specified.
@ -69,20 +77,26 @@ const (
// is made for defaultImagePullingStuckTimeout, the image pulling will be cancelled.
// Docker reports image progress for every 512kB block, so normally there shouldn't be too long interval
// between progress updates.
// TODO(random-liu): Make this configurable
defaultImagePullingStuckTimeout = 1 * time.Minute
)
// newKubeDockerClient creates an kubeDockerClient from an existing docker client.
func newKubeDockerClient(dockerClient *dockerapi.Client) DockerInterface {
// newKubeDockerClient creates an kubeDockerClient from an existing docker client. If requestTimeout is 0,
// defaultTimeout will be applied.
func newKubeDockerClient(dockerClient *dockerapi.Client, requestTimeout time.Duration) DockerInterface {
if requestTimeout == 0 {
requestTimeout = defaultTimeout
}
return &kubeDockerClient{
client: dockerClient,
client: dockerClient,
timeout: requestTimeout,
}
}
func (k *kubeDockerClient) ListContainers(options dockertypes.ContainerListOptions) ([]dockertypes.Container, error) {
ctx, cancel := getDefaultContext()
func (d *kubeDockerClient) ListContainers(options dockertypes.ContainerListOptions) ([]dockertypes.Container, error) {
ctx, cancel := d.getTimeoutContext()
defer cancel()
containers, err := k.client.ContainerList(ctx, options)
containers, err := d.client.ContainerList(ctx, options)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
@ -93,7 +107,7 @@ func (k *kubeDockerClient) ListContainers(options dockertypes.ContainerListOptio
}
func (d *kubeDockerClient) InspectContainer(id string) (*dockertypes.ContainerJSON, error) {
ctx, cancel := getDefaultContext()
ctx, cancel := d.getTimeoutContext()
defer cancel()
containerJSON, err := d.client.ContainerInspect(ctx, id)
if ctxErr := contextError(ctx); ctxErr != nil {
@ -109,7 +123,7 @@ func (d *kubeDockerClient) InspectContainer(id string) (*dockertypes.ContainerJS
}
func (d *kubeDockerClient) CreateContainer(opts dockertypes.ContainerCreateConfig) (*dockertypes.ContainerCreateResponse, error) {
ctx, cancel := getDefaultContext()
ctx, cancel := d.getTimeoutContext()
defer cancel()
// we provide an explicit default shm size as to not depend on docker daemon.
// TODO: evaluate exposing this as a knob in the API
@ -127,7 +141,7 @@ func (d *kubeDockerClient) CreateContainer(opts dockertypes.ContainerCreateConfi
}
func (d *kubeDockerClient) StartContainer(id string) error {
ctx, cancel := getDefaultContext()
ctx, cancel := d.getTimeoutContext()
defer cancel()
err := d.client.ContainerStart(ctx, id)
if ctxErr := contextError(ctx); ctxErr != nil {
@ -138,7 +152,7 @@ func (d *kubeDockerClient) StartContainer(id string) error {
// Stopping an already stopped container will not cause an error in engine-api.
func (d *kubeDockerClient) StopContainer(id string, timeout int) error {
ctx, cancel := getDefaultContext()
ctx, cancel := d.getTimeoutContext()
defer cancel()
err := d.client.ContainerStop(ctx, id, timeout)
if ctxErr := contextError(ctx); ctxErr != nil {
@ -148,7 +162,7 @@ func (d *kubeDockerClient) StopContainer(id string, timeout int) error {
}
func (d *kubeDockerClient) RemoveContainer(id string, opts dockertypes.ContainerRemoveOptions) error {
ctx, cancel := getDefaultContext()
ctx, cancel := d.getTimeoutContext()
defer cancel()
err := d.client.ContainerRemove(ctx, id, opts)
if ctxErr := contextError(ctx); ctxErr != nil {
@ -158,7 +172,7 @@ func (d *kubeDockerClient) RemoveContainer(id string, opts dockertypes.Container
}
func (d *kubeDockerClient) InspectImage(image string) (*dockertypes.ImageInspect, error) {
ctx, cancel := getDefaultContext()
ctx, cancel := d.getTimeoutContext()
defer cancel()
resp, _, err := d.client.ImageInspectWithRaw(ctx, image, true)
if ctxErr := contextError(ctx); ctxErr != nil {
@ -174,7 +188,7 @@ func (d *kubeDockerClient) InspectImage(image string) (*dockertypes.ImageInspect
}
func (d *kubeDockerClient) ImageHistory(id string) ([]dockertypes.ImageHistory, error) {
ctx, cancel := getDefaultContext()
ctx, cancel := d.getTimeoutContext()
defer cancel()
resp, err := d.client.ImageHistory(ctx, id)
if ctxErr := contextError(ctx); ctxErr != nil {
@ -184,7 +198,7 @@ func (d *kubeDockerClient) ImageHistory(id string) ([]dockertypes.ImageHistory,
}
func (d *kubeDockerClient) ListImages(opts dockertypes.ImageListOptions) ([]dockertypes.Image, error) {
ctx, cancel := getDefaultContext()
ctx, cancel := d.getTimeoutContext()
defer cancel()
images, err := d.client.ImageList(ctx, opts)
if ctxErr := contextError(ctx); ctxErr != nil {
@ -297,7 +311,7 @@ func (d *kubeDockerClient) PullImage(image string, auth dockertypes.AuthConfig,
return err
}
opts.RegistryAuth = base64Auth
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := d.getCancelableContext()
defer cancel()
resp, err := d.client.ImagePull(ctx, image, opts)
if err != nil {
@ -326,7 +340,7 @@ func (d *kubeDockerClient) PullImage(image string, auth dockertypes.AuthConfig,
}
func (d *kubeDockerClient) RemoveImage(image string, opts dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDelete, error) {
ctx, cancel := getDefaultContext()
ctx, cancel := d.getTimeoutContext()
defer cancel()
resp, err := d.client.ImageRemove(ctx, image, opts)
if ctxErr := contextError(ctx); ctxErr != nil {
@ -336,8 +350,12 @@ func (d *kubeDockerClient) RemoveImage(image string, opts dockertypes.ImageRemov
}
func (d *kubeDockerClient) Logs(id string, opts dockertypes.ContainerLogsOptions, sopts StreamOptions) error {
// Don't set timeout for log calls
resp, err := d.client.ContainerLogs(context.Background(), id, opts)
ctx, cancel := d.getCancelableContext()
defer cancel()
resp, err := d.client.ContainerLogs(ctx, id, opts)
if ctxErr := contextError(ctx); ctxErr != nil {
return ctxErr
}
if err != nil {
return err
}
@ -346,7 +364,7 @@ func (d *kubeDockerClient) Logs(id string, opts dockertypes.ContainerLogsOptions
}
func (d *kubeDockerClient) Version() (*dockertypes.Version, error) {
ctx, cancel := getDefaultContext()
ctx, cancel := d.getTimeoutContext()
defer cancel()
resp, err := d.client.ServerVersion(ctx)
if ctxErr := contextError(ctx); ctxErr != nil {
@ -359,7 +377,7 @@ func (d *kubeDockerClient) Version() (*dockertypes.Version, error) {
}
func (d *kubeDockerClient) Info() (*dockertypes.Info, error) {
ctx, cancel := getDefaultContext()
ctx, cancel := d.getTimeoutContext()
defer cancel()
resp, err := d.client.Info(ctx)
if ctxErr := contextError(ctx); ctxErr != nil {
@ -373,7 +391,7 @@ func (d *kubeDockerClient) Info() (*dockertypes.Info, error) {
// TODO(random-liu): Add unit test for exec and attach functions, just like what go-dockerclient did.
func (d *kubeDockerClient) CreateExec(id string, opts dockertypes.ExecConfig) (*dockertypes.ContainerExecCreateResponse, error) {
ctx, cancel := getDefaultContext()
ctx, cancel := d.getTimeoutContext()
defer cancel()
resp, err := d.client.ContainerExecCreate(ctx, id, opts)
if ctxErr := contextError(ctx); ctxErr != nil {
@ -386,7 +404,7 @@ func (d *kubeDockerClient) CreateExec(id string, opts dockertypes.ExecConfig) (*
}
func (d *kubeDockerClient) StartExec(startExec string, opts dockertypes.ExecStartCheck, sopts StreamOptions) error {
ctx, cancel := getDefaultContext()
ctx, cancel := d.getCancelableContext()
defer cancel()
if opts.Detach {
err := d.client.ContainerExecStart(ctx, startExec, opts)
@ -410,7 +428,7 @@ func (d *kubeDockerClient) StartExec(startExec string, opts dockertypes.ExecStar
}
func (d *kubeDockerClient) InspectExec(id string) (*dockertypes.ContainerExecInspect, error) {
ctx, cancel := getDefaultContext()
ctx, cancel := d.getTimeoutContext()
defer cancel()
resp, err := d.client.ContainerExecInspect(ctx, id)
if ctxErr := contextError(ctx); ctxErr != nil {
@ -423,7 +441,7 @@ func (d *kubeDockerClient) InspectExec(id string) (*dockertypes.ContainerExecIns
}
func (d *kubeDockerClient) AttachToContainer(id string, opts dockertypes.ContainerAttachOptions, sopts StreamOptions) error {
ctx, cancel := getDefaultContext()
ctx, cancel := d.getCancelableContext()
defer cancel()
resp, err := d.client.ContainerAttach(ctx, id, opts)
if ctxErr := contextError(ctx); ctxErr != nil {
@ -484,16 +502,23 @@ func (d *kubeDockerClient) holdHijackedConnection(tty bool, inputStream io.Reade
return nil
}
// getCancelableContext returns a new cancelable context. For long running requests without timeout, we use cancelable
// context to avoid potential resource leak, although the current implementation shouldn't leak resource.
func (d *kubeDockerClient) getCancelableContext() (context.Context, context.CancelFunc) {
return context.WithCancel(context.Background())
}
// getTimeoutContext returns a new context with default request timeout
func (d *kubeDockerClient) getTimeoutContext() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), d.timeout)
}
// parseDockerTimestamp parses the timestamp returned by DockerInterface from string to time.Time
func parseDockerTimestamp(s string) (time.Time, error) {
// Timestamp returned by Docker is in time.RFC3339Nano format.
return time.Parse(time.RFC3339Nano, s)
}
func getDefaultContext() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), defaultTimeout)
}
// contextError checks the context, and returns error if the context is timeout.
func contextError(ctx context.Context) error {
if ctx.Err() == context.DeadlineExceeded {

View File

@ -40,7 +40,7 @@ func NewConformanceImage(containerRuntime string, image string) (ci ConformanceI
//TODO: do not expose kubelet implementation details after we refactor the runtime API.
func dockerRuntime() kubecontainer.Runtime {
dockerClient := dockertools.ConnectToDockerOrDie("")
dockerClient := dockertools.ConnectToDockerOrDie("", 0)
pm := kubepod.NewBasicPodManager(nil)
dm := dockertools.NewDockerManager(
dockerClient,