Merge pull request #27570 from Random-Liu/add-runtime-request-timeout-flag

Automatic merge from submit-queue

Add runtime-request-timeout kubelet flag.

XRef #23563.

Addresses https://github.com/kubernetes/kubernetes/issues/27388#issuecomment-226570083.

Add a new kubelet flag `runtime-request-timeout`, and set to 2 minutes by default.
Now the flag only affects dockertools, rkt may also want to set request timeout according to the flag. @yifan-gu 

This PR also removed the timeout for all long running operations to avoid issues like #27588 and #26122.

@yujuhong @rrati 
/cc @kubernetes/sig-node 

[![Analytics](https://kubernetes-site.appspot.com/UA-36037335-10/GitHub/.github/PULL_REQUEST_TEMPLATE.md?pixel)]()
This commit is contained in:
k8s-merge-robot 2016-06-21 01:26:54 -07:00 committed by GitHub
commit ee35555cb6
17 changed files with 974 additions and 822 deletions

View File

@ -85,6 +85,7 @@ func NewKubeletServer() *KubeletServer {
CgroupRoot: "",
ConfigureCBR0: false,
ContainerRuntime: "docker",
RuntimeRequestTimeout: unversioned.Duration{Duration: 2 * time.Minute},
CPUCFSQuota: true,
DockerExecHandlerName: "native",
EventBurst: 10,
@ -227,6 +228,7 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.CgroupRoot, "cgroup-root", s.CgroupRoot, "Optional root cgroup to use for pods. This is handled by the container runtime on a best effort basis. Default: '', which means use the container runtime default.")
fs.StringVar(&s.ContainerRuntime, "container-runtime", s.ContainerRuntime, "The container runtime to use. Possible values: 'docker', 'rkt'. Default: 'docker'.")
fs.DurationVar(&s.RuntimeRequestTimeout.Duration, "runtime-request-timeout", s.RuntimeRequestTimeout.Duration, "Timeout of all runtime requests except long running request - pull, logs, exec and attach. When timeout exceeded, kubelet will cancel the request, throw out an error and retry later. Default: 2m0s")
fs.StringVar(&s.LockFilePath, "lock-file", s.LockFilePath, "<Warning: Alpha feature> The path to file for kubelet to use as a lock file.")
fs.BoolVar(&s.ExitOnLockContention, "exit-on-lock-contention", s.ExitOnLockContention, "Whether kubelet should exit upon lock-file contention.")
fs.StringVar(&s.RktPath, "rkt-path", s.RktPath, "Path of rkt binary. Leave empty to use the first rkt in $PATH. Only used if --container-runtime='rkt'.")

View File

@ -210,9 +210,10 @@ func UnsecuredKubeletConfig(s *options.KubeletServer) (*KubeletConfig, error) {
ConfigureCBR0: s.ConfigureCBR0,
ContainerManager: nil,
ContainerRuntime: s.ContainerRuntime,
RuntimeRequestTimeout: s.RuntimeRequestTimeout.Duration,
CPUCFSQuota: s.CPUCFSQuota,
DiskSpacePolicy: diskSpacePolicy,
DockerClient: dockertools.ConnectToDockerOrDie(s.DockerEndpoint),
DockerClient: dockertools.ConnectToDockerOrDie(s.DockerEndpoint, s.RuntimeRequestTimeout.Duration), // TODO(random-liu): Set RuntimeRequestTimeout for rkt.
RuntimeCgroups: s.RuntimeCgroups,
DockerExecHandler: dockerExecHandler,
EnableControllerAttachDetach: s.EnableControllerAttachDetach,
@ -783,6 +784,7 @@ type KubeletConfig struct {
ConfigureCBR0 bool
ContainerManager cm.ContainerManager
ContainerRuntime string
RuntimeRequestTimeout time.Duration
CPUCFSQuota bool
DiskSpacePolicy kubelet.DiskSpacePolicy
DockerClient dockertools.DockerInterface
@ -923,6 +925,7 @@ func CreateAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.Pod
kc.OSInterface,
kc.CgroupRoot,
kc.ContainerRuntime,
kc.RuntimeRequestTimeout,
kc.RktPath,
kc.RktAPIEndpoint,
kc.RktStage1Image,

View File

@ -184,7 +184,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) {
mockDriver = &MockExecutorDriver{}
registry = newFakeRegistry()
executor = New(Config{
Docker: dockertools.ConnectToDockerOrDie("fake://"),
Docker: dockertools.ConnectToDockerOrDie("fake://", 0),
NodeInfos: make(chan NodeInfo, 1),
Registry: registry,
})
@ -387,7 +387,7 @@ func TestExecutorFrameworkMessage(t *testing.T) {
kubeletFinished = make(chan struct{})
registry = newFakeRegistry()
executor = New(Config{
Docker: dockertools.ConnectToDockerOrDie("fake://"),
Docker: dockertools.ConnectToDockerOrDie("fake://", 0),
NodeInfos: make(chan NodeInfo, 1),
ShutdownAlert: func() {
close(kubeletFinished)
@ -584,7 +584,7 @@ func TestExecutorShutdown(t *testing.T) {
kubeletFinished = make(chan struct{})
exitCalled = int32(0)
executor = New(Config{
Docker: dockertools.ConnectToDockerOrDie("fake://"),
Docker: dockertools.ConnectToDockerOrDie("fake://", 0),
NodeInfos: make(chan NodeInfo, 1),
ShutdownAlert: func() {
close(kubeletFinished)

View File

@ -75,7 +75,7 @@ func (m *MockExecutorDriver) SendFrameworkMessage(msg string) (mesosproto.Status
func NewTestKubernetesExecutor() *Executor {
return New(Config{
Docker: dockertools.ConnectToDockerOrDie("fake://"),
Docker: dockertools.ConnectToDockerOrDie("fake://", 0),
Registry: newFakeRegistry(),
})
}

View File

@ -107,7 +107,7 @@ func (s *KubeletExecutorServer) runExecutor(
exec := executor.New(executor.Config{
Registry: registry,
APIClient: apiclient,
Docker: dockertools.ConnectToDockerOrDie(s.DockerEndpoint),
Docker: dockertools.ConnectToDockerOrDie(s.DockerEndpoint, 0),
SuicideTimeout: s.SuicideTimeout,
KubeletFinished: kubeletFinished,
ExitFunc: os.Exit,

View File

@ -399,6 +399,7 @@ root-dir
run-proxy
runtime-cgroups
runtime-config
runtime-request-timeout
save-config
scheduler-config
scheduler-name

View File

@ -273,6 +273,9 @@ func DeepCopy_componentconfig_KubeletConfiguration(in KubeletConfiguration, out
out.SystemCgroups = in.SystemCgroups
out.CgroupRoot = in.CgroupRoot
out.ContainerRuntime = in.ContainerRuntime
if err := unversioned.DeepCopy_unversioned_Duration(in.RuntimeRequestTimeout, &out.RuntimeRequestTimeout, c); err != nil {
return err
}
out.RktPath = in.RktPath
out.RktAPIEndpoint = in.RktAPIEndpoint
out.RktStage1Image = in.RktStage1Image

File diff suppressed because it is too large Load Diff

View File

@ -263,6 +263,9 @@ type KubeletConfiguration struct {
CgroupRoot string `json:"cgroupRoot,omitempty"`
// containerRuntime is the container runtime to use.
ContainerRuntime string `json:"containerRuntime"`
// runtimeRequestTimeout is the timeout for all runtime requests except long running
// requests - pull, logs, exec and attach.
RuntimeRequestTimeout unversioned.Duration `json:"runtimeRequestTimeout,omitempty"`
// rktPath is the path of rkt binary. Leave empty to use the first rkt in
// $PATH.
RktPath string `json:"rktPath,omitempty"`

View File

@ -23,6 +23,7 @@ import (
"path"
"strconv"
"strings"
"time"
dockerref "github.com/docker/distribution/reference"
"github.com/docker/docker/pkg/jsonmessage"
@ -311,8 +312,11 @@ func getDockerClient(dockerEndpoint string) (*dockerapi.Client, error) {
// ConnectToDockerOrDie creates docker client connecting to docker daemon.
// If the endpoint passed in is "fake://", a fake docker client
// will be returned. The program exits if error occurs.
func ConnectToDockerOrDie(dockerEndpoint string) DockerInterface {
// will be returned. The program exits if error occurs. The requestTimeout
// is the timeout for docker requests. If timeout is exceeded, the request
// will be cancelled and throw out an error. If requestTimeout is 0, a default
// value will be applied.
func ConnectToDockerOrDie(dockerEndpoint string, requestTimeout time.Duration) DockerInterface {
if dockerEndpoint == "fake://" {
return NewFakeDockerClient()
}
@ -320,7 +324,8 @@ func ConnectToDockerOrDie(dockerEndpoint string) DockerInterface {
if err != nil {
glog.Fatalf("Couldn't connect to docker: %v", err)
}
return newKubeDockerClient(client)
glog.Infof("Start docker client with request timeout=%v", requestTimeout)
return newKubeDockerClient(client, requestTimeout)
}
// milliCPUToQuota converts milliCPU to CFS quota and period values

View File

@ -49,14 +49,22 @@ import (
// TODO(random-liu): Swith to new docker interface by refactoring the functions in the old DockerInterface
// one by one.
type kubeDockerClient struct {
client *dockerapi.Client
// timeout is the timeout of short running docker operations.
timeout time.Duration
client *dockerapi.Client
}
// Make sure that kubeDockerClient implemented the DockerInterface.
var _ DockerInterface = &kubeDockerClient{}
// There are 2 kinds of docker operations categorized by running time:
// * Long running operation: The long running operation could run for arbitrary long time, and the running time
// usually depends on some uncontrollable factors. These operations include: PullImage, Logs, StartExec, AttachToContainer.
// * Non-long running operation: Given the maximum load of the system, the non-long running operation should finish
// in expected and usually short time. These include all other operations.
// kubeDockerClient only applies timeout on non-long running operations.
const (
// defaultTimeout is the default timeout of all docker operations.
// defaultTimeout is the default timeout of short running docker operations.
defaultTimeout = 2 * time.Minute
// defaultShmSize is the default ShmSize to use (in bytes) if not specified.
@ -69,20 +77,26 @@ const (
// is made for defaultImagePullingStuckTimeout, the image pulling will be cancelled.
// Docker reports image progress for every 512kB block, so normally there shouldn't be too long interval
// between progress updates.
// TODO(random-liu): Make this configurable
defaultImagePullingStuckTimeout = 1 * time.Minute
)
// newKubeDockerClient creates an kubeDockerClient from an existing docker client.
func newKubeDockerClient(dockerClient *dockerapi.Client) DockerInterface {
// newKubeDockerClient creates an kubeDockerClient from an existing docker client. If requestTimeout is 0,
// defaultTimeout will be applied.
func newKubeDockerClient(dockerClient *dockerapi.Client, requestTimeout time.Duration) DockerInterface {
if requestTimeout == 0 {
requestTimeout = defaultTimeout
}
return &kubeDockerClient{
client: dockerClient,
client: dockerClient,
timeout: requestTimeout,
}
}
func (k *kubeDockerClient) ListContainers(options dockertypes.ContainerListOptions) ([]dockertypes.Container, error) {
ctx, cancel := getDefaultContext()
func (d *kubeDockerClient) ListContainers(options dockertypes.ContainerListOptions) ([]dockertypes.Container, error) {
ctx, cancel := d.getTimeoutContext()
defer cancel()
containers, err := k.client.ContainerList(ctx, options)
containers, err := d.client.ContainerList(ctx, options)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
@ -93,7 +107,7 @@ func (k *kubeDockerClient) ListContainers(options dockertypes.ContainerListOptio
}
func (d *kubeDockerClient) InspectContainer(id string) (*dockertypes.ContainerJSON, error) {
ctx, cancel := getDefaultContext()
ctx, cancel := d.getTimeoutContext()
defer cancel()
containerJSON, err := d.client.ContainerInspect(ctx, id)
if ctxErr := contextError(ctx); ctxErr != nil {
@ -109,7 +123,7 @@ func (d *kubeDockerClient) InspectContainer(id string) (*dockertypes.ContainerJS
}
func (d *kubeDockerClient) CreateContainer(opts dockertypes.ContainerCreateConfig) (*dockertypes.ContainerCreateResponse, error) {
ctx, cancel := getDefaultContext()
ctx, cancel := d.getTimeoutContext()
defer cancel()
// we provide an explicit default shm size as to not depend on docker daemon.
// TODO: evaluate exposing this as a knob in the API
@ -127,7 +141,7 @@ func (d *kubeDockerClient) CreateContainer(opts dockertypes.ContainerCreateConfi
}
func (d *kubeDockerClient) StartContainer(id string) error {
ctx, cancel := getDefaultContext()
ctx, cancel := d.getTimeoutContext()
defer cancel()
err := d.client.ContainerStart(ctx, id)
if ctxErr := contextError(ctx); ctxErr != nil {
@ -138,7 +152,7 @@ func (d *kubeDockerClient) StartContainer(id string) error {
// Stopping an already stopped container will not cause an error in engine-api.
func (d *kubeDockerClient) StopContainer(id string, timeout int) error {
ctx, cancel := getDefaultContext()
ctx, cancel := d.getTimeoutContext()
defer cancel()
err := d.client.ContainerStop(ctx, id, timeout)
if ctxErr := contextError(ctx); ctxErr != nil {
@ -148,7 +162,7 @@ func (d *kubeDockerClient) StopContainer(id string, timeout int) error {
}
func (d *kubeDockerClient) RemoveContainer(id string, opts dockertypes.ContainerRemoveOptions) error {
ctx, cancel := getDefaultContext()
ctx, cancel := d.getTimeoutContext()
defer cancel()
err := d.client.ContainerRemove(ctx, id, opts)
if ctxErr := contextError(ctx); ctxErr != nil {
@ -158,7 +172,7 @@ func (d *kubeDockerClient) RemoveContainer(id string, opts dockertypes.Container
}
func (d *kubeDockerClient) InspectImage(image string) (*dockertypes.ImageInspect, error) {
ctx, cancel := getDefaultContext()
ctx, cancel := d.getTimeoutContext()
defer cancel()
resp, _, err := d.client.ImageInspectWithRaw(ctx, image, true)
if ctxErr := contextError(ctx); ctxErr != nil {
@ -174,7 +188,7 @@ func (d *kubeDockerClient) InspectImage(image string) (*dockertypes.ImageInspect
}
func (d *kubeDockerClient) ImageHistory(id string) ([]dockertypes.ImageHistory, error) {
ctx, cancel := getDefaultContext()
ctx, cancel := d.getTimeoutContext()
defer cancel()
resp, err := d.client.ImageHistory(ctx, id)
if ctxErr := contextError(ctx); ctxErr != nil {
@ -184,7 +198,7 @@ func (d *kubeDockerClient) ImageHistory(id string) ([]dockertypes.ImageHistory,
}
func (d *kubeDockerClient) ListImages(opts dockertypes.ImageListOptions) ([]dockertypes.Image, error) {
ctx, cancel := getDefaultContext()
ctx, cancel := d.getTimeoutContext()
defer cancel()
images, err := d.client.ImageList(ctx, opts)
if ctxErr := contextError(ctx); ctxErr != nil {
@ -297,7 +311,7 @@ func (d *kubeDockerClient) PullImage(image string, auth dockertypes.AuthConfig,
return err
}
opts.RegistryAuth = base64Auth
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := d.getCancelableContext()
defer cancel()
resp, err := d.client.ImagePull(ctx, image, opts)
if err != nil {
@ -326,7 +340,7 @@ func (d *kubeDockerClient) PullImage(image string, auth dockertypes.AuthConfig,
}
func (d *kubeDockerClient) RemoveImage(image string, opts dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDelete, error) {
ctx, cancel := getDefaultContext()
ctx, cancel := d.getTimeoutContext()
defer cancel()
resp, err := d.client.ImageRemove(ctx, image, opts)
if ctxErr := contextError(ctx); ctxErr != nil {
@ -336,8 +350,12 @@ func (d *kubeDockerClient) RemoveImage(image string, opts dockertypes.ImageRemov
}
func (d *kubeDockerClient) Logs(id string, opts dockertypes.ContainerLogsOptions, sopts StreamOptions) error {
// Don't set timeout for log calls
resp, err := d.client.ContainerLogs(context.Background(), id, opts)
ctx, cancel := d.getCancelableContext()
defer cancel()
resp, err := d.client.ContainerLogs(ctx, id, opts)
if ctxErr := contextError(ctx); ctxErr != nil {
return ctxErr
}
if err != nil {
return err
}
@ -346,7 +364,7 @@ func (d *kubeDockerClient) Logs(id string, opts dockertypes.ContainerLogsOptions
}
func (d *kubeDockerClient) Version() (*dockertypes.Version, error) {
ctx, cancel := getDefaultContext()
ctx, cancel := d.getTimeoutContext()
defer cancel()
resp, err := d.client.ServerVersion(ctx)
if ctxErr := contextError(ctx); ctxErr != nil {
@ -359,7 +377,7 @@ func (d *kubeDockerClient) Version() (*dockertypes.Version, error) {
}
func (d *kubeDockerClient) Info() (*dockertypes.Info, error) {
ctx, cancel := getDefaultContext()
ctx, cancel := d.getTimeoutContext()
defer cancel()
resp, err := d.client.Info(ctx)
if ctxErr := contextError(ctx); ctxErr != nil {
@ -373,7 +391,7 @@ func (d *kubeDockerClient) Info() (*dockertypes.Info, error) {
// TODO(random-liu): Add unit test for exec and attach functions, just like what go-dockerclient did.
func (d *kubeDockerClient) CreateExec(id string, opts dockertypes.ExecConfig) (*dockertypes.ContainerExecCreateResponse, error) {
ctx, cancel := getDefaultContext()
ctx, cancel := d.getTimeoutContext()
defer cancel()
resp, err := d.client.ContainerExecCreate(ctx, id, opts)
if ctxErr := contextError(ctx); ctxErr != nil {
@ -386,7 +404,7 @@ func (d *kubeDockerClient) CreateExec(id string, opts dockertypes.ExecConfig) (*
}
func (d *kubeDockerClient) StartExec(startExec string, opts dockertypes.ExecStartCheck, sopts StreamOptions) error {
ctx, cancel := getDefaultContext()
ctx, cancel := d.getCancelableContext()
defer cancel()
if opts.Detach {
err := d.client.ContainerExecStart(ctx, startExec, opts)
@ -410,7 +428,7 @@ func (d *kubeDockerClient) StartExec(startExec string, opts dockertypes.ExecStar
}
func (d *kubeDockerClient) InspectExec(id string) (*dockertypes.ContainerExecInspect, error) {
ctx, cancel := getDefaultContext()
ctx, cancel := d.getTimeoutContext()
defer cancel()
resp, err := d.client.ContainerExecInspect(ctx, id)
if ctxErr := contextError(ctx); ctxErr != nil {
@ -423,7 +441,7 @@ func (d *kubeDockerClient) InspectExec(id string) (*dockertypes.ContainerExecIns
}
func (d *kubeDockerClient) AttachToContainer(id string, opts dockertypes.ContainerAttachOptions, sopts StreamOptions) error {
ctx, cancel := getDefaultContext()
ctx, cancel := d.getCancelableContext()
defer cancel()
resp, err := d.client.ContainerAttach(ctx, id, opts)
if ctxErr := contextError(ctx); ctxErr != nil {
@ -484,16 +502,23 @@ func (d *kubeDockerClient) holdHijackedConnection(tty bool, inputStream io.Reade
return nil
}
// getCancelableContext returns a new cancelable context. For long running requests without timeout, we use cancelable
// context to avoid potential resource leak, although the current implementation shouldn't leak resource.
func (d *kubeDockerClient) getCancelableContext() (context.Context, context.CancelFunc) {
return context.WithCancel(context.Background())
}
// getTimeoutContext returns a new context with default request timeout
func (d *kubeDockerClient) getTimeoutContext() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), d.timeout)
}
// parseDockerTimestamp parses the timestamp returned by DockerInterface from string to time.Time
func parseDockerTimestamp(s string) (time.Time, error) {
// Timestamp returned by Docker is in time.RFC3339Nano format.
return time.Parse(time.RFC3339Nano, s)
}
func getDefaultContext() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), defaultTimeout)
}
// contextError checks the context, and returns error if the context is timeout.
func contextError(ctx context.Context) error {
if ctx.Err() == context.DeadlineExceeded {

View File

@ -206,6 +206,7 @@ func NewMainKubelet(
osInterface kubecontainer.OSInterface,
cgroupRoot string,
containerRuntime string,
runtimeRequestTimeout time.Duration,
rktPath string,
rktAPIEndpoint string,
rktStage1Image string,
@ -451,6 +452,7 @@ func NewMainKubelet(
kubecontainer.RealOS{},
imageBackOff,
serializeImagePulls,
runtimeRequestTimeout,
)
if err != nil {
return nil, err

View File

@ -79,7 +79,9 @@ func (c *Config) buildGlobalOptions() []string {
// that the fields in the provided config will override the
// result that get from the rkt api service.
func (r *Runtime) getConfig(cfg *Config) (*Config, error) {
resp, err := r.apisvc.GetInfo(context.Background(), &rktapi.GetInfoRequest{})
ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout)
defer cancel()
resp, err := r.apisvc.GetInfo(ctx, &rktapi.GetInfoRequest{})
if err != nil {
return nil, err
}

View File

@ -90,7 +90,9 @@ func (r *Runtime) IsImagePresent(image kubecontainer.ImageSpec) (bool, error) {
// ListImages lists all the available appc images on the machine by invoking 'rkt image list'.
func (r *Runtime) ListImages() ([]kubecontainer.Image, error) {
listResp, err := r.apisvc.ListImages(context.Background(), &rktapi.ListImagesRequest{})
ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout)
defer cancel()
listResp, err := r.apisvc.ListImages(ctx, &rktapi.ListImagesRequest{})
if err != nil {
return nil, fmt.Errorf("couldn't list images: %v", err)
}
@ -155,7 +157,9 @@ func (r *Runtime) listImages(image string, detail bool) ([]*rktapi.Image, error)
return nil, err
}
listResp, err := r.apisvc.ListImages(context.Background(), &rktapi.ListImagesRequest{
ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout)
defer cancel()
listResp, err := r.apisvc.ListImages(ctx, &rktapi.ListImagesRequest{
Detail: detail,
Filters: []*rktapi.ImageFilter{
{
@ -231,7 +235,9 @@ func (r *Runtime) writeDockerAuthConfig(image string, credsSlice []credentialpro
// ImageStats returns the image stat (total storage bytes).
func (r *Runtime) ImageStats() (*kubecontainer.ImageStats, error) {
var imageStat kubecontainer.ImageStats
listResp, err := r.apisvc.ListImages(context.Background(), &rktapi.ListImagesRequest{})
ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout)
defer cancel()
listResp, err := r.apisvc.ListImages(ctx, &rktapi.ListImagesRequest{})
if err != nil {
return nil, fmt.Errorf("couldn't list images: %v", err)
}

View File

@ -128,6 +128,9 @@ const (
// under the CNI directory directly.
// See https://github.com/coreos/rkt/pull/2312#issuecomment-200068370.
defaultNetworkName = "rkt.kubernetes.io"
// defaultRequestTimeout is the default timeout of rkt requests.
defaultRequestTimeout = 2 * time.Minute
)
// Runtime implements the Containerruntime for rkt. The implementation
@ -166,6 +169,9 @@ type Runtime struct {
nsenterPath string
versions versions
// requestTimeout is the timeout of rkt requests.
requestTimeout time.Duration
}
var _ kubecontainer.Runtime = &Runtime{}
@ -200,6 +206,7 @@ func New(
os kubecontainer.OSInterface,
imageBackOff *flowcontrol.Backoff,
serializeImagePulls bool,
requestTimeout time.Duration,
) (*Runtime, error) {
// Create dbus connection.
systemd, err := newSystemd()
@ -233,6 +240,10 @@ func New(
return nil, fmt.Errorf("cannot find nsenter binary: %v", err)
}
if requestTimeout == 0 {
requestTimeout = defaultRequestTimeout
}
rkt := &Runtime{
os: kubecontainer.RealOS{},
systemd: systemd,
@ -249,6 +260,7 @@ func New(
execer: execer,
touchPath: touchPath,
nsenterPath: nsenterPath,
requestTimeout: requestTimeout,
}
rkt.config, err = rkt.getConfig(rkt.config)
@ -585,7 +597,9 @@ func setApp(imgManifest *appcschema.ImageManifest, c *api.Container, opts *kubec
func (r *Runtime) makePodManifest(pod *api.Pod, podIP string, pullSecrets []api.Secret) (*appcschema.PodManifest, error) {
manifest := appcschema.BlankPodManifest()
listResp, err := r.apisvc.ListPods(context.Background(), &rktapi.ListPodsRequest{
ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout)
defer cancel()
listResp, err := r.apisvc.ListPods(ctx, &rktapi.ListPodsRequest{
Detail: true,
Filters: kubernetesPodFilters(pod.UID),
})
@ -1349,7 +1363,9 @@ func (r *Runtime) runPostStartHook(containerID kubecontainer.ContainerID, pod *a
}
isContainerRunning := func() (done bool, err error) {
resp, err := r.apisvc.InspectPod(context.Background(), &rktapi.InspectPodRequest{Id: cid.uuid})
ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout)
defer cancel()
resp, err := r.apisvc.InspectPod(ctx, &rktapi.InspectPodRequest{Id: cid.uuid})
if err != nil {
return false, fmt.Errorf("failed to inspect rkt pod %q for pod %q", cid.uuid, format.Pod(pod))
}
@ -1520,7 +1536,9 @@ func (r *Runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) {
if !all {
listReq.Filters[0].States = []rktapi.PodState{rktapi.PodState_POD_STATE_RUNNING}
}
listResp, err := r.apisvc.ListPods(context.Background(), listReq)
ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout)
defer cancel()
listResp, err := r.apisvc.ListPods(ctx, listReq)
if err != nil {
return nil, fmt.Errorf("couldn't list pods: %v", err)
}
@ -1829,7 +1847,9 @@ func (r *Runtime) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy, allSo
return err
}
resp, err := r.apisvc.ListPods(context.Background(), &rktapi.ListPodsRequest{Filters: kubernetesPodsFilters()})
ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout)
defer cancel()
resp, err := r.apisvc.ListPods(ctx, &rktapi.ListPodsRequest{Filters: kubernetesPodsFilters()})
if err != nil {
glog.Errorf("rkt: Failed to list pods: %v", err)
return err
@ -2047,7 +2067,9 @@ func (r *Runtime) ExecInContainer(containerID kubecontainer.ContainerID, cmd []s
func (r *Runtime) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error {
glog.V(4).Infof("Rkt port forwarding in container.")
listResp, err := r.apisvc.ListPods(context.Background(), &rktapi.ListPodsRequest{
ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout)
defer cancel()
listResp, err := r.apisvc.ListPods(ctx, &rktapi.ListPodsRequest{
Detail: true,
Filters: runningKubernetesPodFilters(pod.ID),
})
@ -2197,7 +2219,9 @@ func (r *Runtime) GetPodStatus(uid kubetypes.UID, name, namespace string) (*kube
Namespace: namespace,
}
listResp, err := r.apisvc.ListPods(context.Background(), &rktapi.ListPodsRequest{
ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout)
defer cancel()
listResp, err := r.apisvc.ListPods(ctx, &rktapi.ListPodsRequest{
Detail: true,
Filters: kubernetesPodFilters(uid),
})

View File

@ -73,9 +73,11 @@ func (r *Runtime) getVersions() error {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout)
defer cancel()
// Example for the version strings returned by GetInfo():
// RktVersion:"0.10.0+gitb7349b1" AppcVersion:"0.7.1" ApiVersion:"1.0.0-alpha"
resp, err := r.apisvc.GetInfo(context.Background(), &rktapi.GetInfoRequest{})
resp, err := r.apisvc.GetInfo(ctx, &rktapi.GetInfoRequest{})
if err != nil {
return err
}

View File

@ -40,7 +40,7 @@ func NewConformanceImage(containerRuntime string, image string) (ci ConformanceI
//TODO: do not expose kubelet implementation details after we refactor the runtime API.
func dockerRuntime() kubecontainer.Runtime {
dockerClient := dockertools.ConnectToDockerOrDie("")
dockerClient := dockertools.ConnectToDockerOrDie("", 0)
pm := kubepod.NewBasicPodManager(nil)
dm := dockertools.NewDockerManager(
dockerClient,