From 3cc9ca3988c8c6797a532ce0a75c350738fd9a9d Mon Sep 17 00:00:00 2001 From: Random-Liu Date: Fri, 17 Jun 2016 14:28:30 -0700 Subject: [PATCH] Add timeout for rkt requests. --- cmd/kubelet/app/server.go | 3 +++ pkg/kubelet/kubelet.go | 2 ++ pkg/kubelet/rkt/config.go | 4 +++- pkg/kubelet/rkt/image.go | 12 +++++++++--- pkg/kubelet/rkt/rkt.go | 36 ++++++++++++++++++++++++++++++------ pkg/kubelet/rkt/version.go | 4 +++- 6 files changed, 50 insertions(+), 11 deletions(-) diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 72e69e311e0..83694d4657e 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -210,6 +210,7 @@ func UnsecuredKubeletConfig(s *options.KubeletServer) (*KubeletConfig, error) { ConfigureCBR0: s.ConfigureCBR0, ContainerManager: nil, ContainerRuntime: s.ContainerRuntime, + RuntimeRequestTimeout: s.RuntimeRequestTimeout.Duration, CPUCFSQuota: s.CPUCFSQuota, DiskSpacePolicy: diskSpacePolicy, DockerClient: dockertools.ConnectToDockerOrDie(s.DockerEndpoint, s.RuntimeRequestTimeout.Duration), // TODO(random-liu): Set RuntimeRequestTimeout for rkt. @@ -781,6 +782,7 @@ type KubeletConfig struct { ConfigureCBR0 bool ContainerManager cm.ContainerManager ContainerRuntime string + RuntimeRequestTimeout time.Duration CPUCFSQuota bool DiskSpacePolicy kubelet.DiskSpacePolicy DockerClient dockertools.DockerInterface @@ -921,6 +923,7 @@ func CreateAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.Pod kc.OSInterface, kc.CgroupRoot, kc.ContainerRuntime, + kc.RuntimeRequestTimeout, kc.RktPath, kc.RktAPIEndpoint, kc.RktStage1Image, diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index d5ea19373cf..7870262b200 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -206,6 +206,7 @@ func NewMainKubelet( osInterface kubecontainer.OSInterface, cgroupRoot string, containerRuntime string, + runtimeRequestTimeout time.Duration, rktPath string, rktAPIEndpoint string, rktStage1Image string, @@ -451,6 +452,7 @@ func NewMainKubelet( kubecontainer.RealOS{}, imageBackOff, serializeImagePulls, + runtimeRequestTimeout, ) if err != nil { return nil, err diff --git a/pkg/kubelet/rkt/config.go b/pkg/kubelet/rkt/config.go index 809eefc5431..d978847f080 100644 --- a/pkg/kubelet/rkt/config.go +++ b/pkg/kubelet/rkt/config.go @@ -79,7 +79,9 @@ func (c *Config) buildGlobalOptions() []string { // that the fields in the provided config will override the // result that get from the rkt api service. func (r *Runtime) getConfig(cfg *Config) (*Config, error) { - resp, err := r.apisvc.GetInfo(context.Background(), &rktapi.GetInfoRequest{}) + ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout) + defer cancel() + resp, err := r.apisvc.GetInfo(ctx, &rktapi.GetInfoRequest{}) if err != nil { return nil, err } diff --git a/pkg/kubelet/rkt/image.go b/pkg/kubelet/rkt/image.go index afdbe94ee92..0cad258c4fb 100644 --- a/pkg/kubelet/rkt/image.go +++ b/pkg/kubelet/rkt/image.go @@ -90,7 +90,9 @@ func (r *Runtime) IsImagePresent(image kubecontainer.ImageSpec) (bool, error) { // ListImages lists all the available appc images on the machine by invoking 'rkt image list'. func (r *Runtime) ListImages() ([]kubecontainer.Image, error) { - listResp, err := r.apisvc.ListImages(context.Background(), &rktapi.ListImagesRequest{}) + ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout) + defer cancel() + listResp, err := r.apisvc.ListImages(ctx, &rktapi.ListImagesRequest{}) if err != nil { return nil, fmt.Errorf("couldn't list images: %v", err) } @@ -155,7 +157,9 @@ func (r *Runtime) listImages(image string, detail bool) ([]*rktapi.Image, error) return nil, err } - listResp, err := r.apisvc.ListImages(context.Background(), &rktapi.ListImagesRequest{ + ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout) + defer cancel() + listResp, err := r.apisvc.ListImages(ctx, &rktapi.ListImagesRequest{ Detail: detail, Filters: []*rktapi.ImageFilter{ { @@ -231,7 +235,9 @@ func (r *Runtime) writeDockerAuthConfig(image string, credsSlice []credentialpro // ImageStats returns the image stat (total storage bytes). func (r *Runtime) ImageStats() (*kubecontainer.ImageStats, error) { var imageStat kubecontainer.ImageStats - listResp, err := r.apisvc.ListImages(context.Background(), &rktapi.ListImagesRequest{}) + ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout) + defer cancel() + listResp, err := r.apisvc.ListImages(ctx, &rktapi.ListImagesRequest{}) if err != nil { return nil, fmt.Errorf("couldn't list images: %v", err) } diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index f50e987f717..4a449fe7e98 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -128,6 +128,9 @@ const ( // under the CNI directory directly. // See https://github.com/coreos/rkt/pull/2312#issuecomment-200068370. defaultNetworkName = "rkt.kubernetes.io" + + // defaultRequestTimeout is the default timeout of rkt requests. + defaultRequestTimeout = 2 * time.Minute ) // Runtime implements the Containerruntime for rkt. The implementation @@ -166,6 +169,9 @@ type Runtime struct { nsenterPath string versions versions + + // requestTimeout is the timeout of rkt requests. + requestTimeout time.Duration } var _ kubecontainer.Runtime = &Runtime{} @@ -200,6 +206,7 @@ func New( os kubecontainer.OSInterface, imageBackOff *flowcontrol.Backoff, serializeImagePulls bool, + requestTimeout time.Duration, ) (*Runtime, error) { // Create dbus connection. systemd, err := newSystemd() @@ -233,6 +240,10 @@ func New( return nil, fmt.Errorf("cannot find nsenter binary: %v", err) } + if requestTimeout == 0 { + requestTimeout = defaultRequestTimeout + } + rkt := &Runtime{ os: kubecontainer.RealOS{}, systemd: systemd, @@ -249,6 +260,7 @@ func New( execer: execer, touchPath: touchPath, nsenterPath: nsenterPath, + requestTimeout: requestTimeout, } rkt.config, err = rkt.getConfig(rkt.config) @@ -585,7 +597,9 @@ func setApp(imgManifest *appcschema.ImageManifest, c *api.Container, opts *kubec func (r *Runtime) makePodManifest(pod *api.Pod, podIP string, pullSecrets []api.Secret) (*appcschema.PodManifest, error) { manifest := appcschema.BlankPodManifest() - listResp, err := r.apisvc.ListPods(context.Background(), &rktapi.ListPodsRequest{ + ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout) + defer cancel() + listResp, err := r.apisvc.ListPods(ctx, &rktapi.ListPodsRequest{ Detail: true, Filters: kubernetesPodFilters(pod.UID), }) @@ -1349,7 +1363,9 @@ func (r *Runtime) runPostStartHook(containerID kubecontainer.ContainerID, pod *a } isContainerRunning := func() (done bool, err error) { - resp, err := r.apisvc.InspectPod(context.Background(), &rktapi.InspectPodRequest{Id: cid.uuid}) + ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout) + defer cancel() + resp, err := r.apisvc.InspectPod(ctx, &rktapi.InspectPodRequest{Id: cid.uuid}) if err != nil { return false, fmt.Errorf("failed to inspect rkt pod %q for pod %q", cid.uuid, format.Pod(pod)) } @@ -1520,7 +1536,9 @@ func (r *Runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) { if !all { listReq.Filters[0].States = []rktapi.PodState{rktapi.PodState_POD_STATE_RUNNING} } - listResp, err := r.apisvc.ListPods(context.Background(), listReq) + ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout) + defer cancel() + listResp, err := r.apisvc.ListPods(ctx, listReq) if err != nil { return nil, fmt.Errorf("couldn't list pods: %v", err) } @@ -1829,7 +1847,9 @@ func (r *Runtime) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy, allSo return err } - resp, err := r.apisvc.ListPods(context.Background(), &rktapi.ListPodsRequest{Filters: kubernetesPodsFilters()}) + ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout) + defer cancel() + resp, err := r.apisvc.ListPods(ctx, &rktapi.ListPodsRequest{Filters: kubernetesPodsFilters()}) if err != nil { glog.Errorf("rkt: Failed to list pods: %v", err) return err @@ -2047,7 +2067,9 @@ func (r *Runtime) ExecInContainer(containerID kubecontainer.ContainerID, cmd []s func (r *Runtime) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error { glog.V(4).Infof("Rkt port forwarding in container.") - listResp, err := r.apisvc.ListPods(context.Background(), &rktapi.ListPodsRequest{ + ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout) + defer cancel() + listResp, err := r.apisvc.ListPods(ctx, &rktapi.ListPodsRequest{ Detail: true, Filters: runningKubernetesPodFilters(pod.ID), }) @@ -2197,7 +2219,9 @@ func (r *Runtime) GetPodStatus(uid kubetypes.UID, name, namespace string) (*kube Namespace: namespace, } - listResp, err := r.apisvc.ListPods(context.Background(), &rktapi.ListPodsRequest{ + ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout) + defer cancel() + listResp, err := r.apisvc.ListPods(ctx, &rktapi.ListPodsRequest{ Detail: true, Filters: kubernetesPodFilters(uid), }) diff --git a/pkg/kubelet/rkt/version.go b/pkg/kubelet/rkt/version.go index 9144cf2ff33..2cf7a634616 100644 --- a/pkg/kubelet/rkt/version.go +++ b/pkg/kubelet/rkt/version.go @@ -73,9 +73,11 @@ func (r *Runtime) getVersions() error { return err } + ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout) + defer cancel() // Example for the version strings returned by GetInfo(): // RktVersion:"0.10.0+gitb7349b1" AppcVersion:"0.7.1" ApiVersion:"1.0.0-alpha" - resp, err := r.apisvc.GetInfo(context.Background(), &rktapi.GetInfoRequest{}) + resp, err := r.apisvc.GetInfo(ctx, &rktapi.GetInfoRequest{}) if err != nil { return err }