Add timeout for rkt requests.

This commit is contained in:
Random-Liu 2016-06-17 14:28:30 -07:00
parent 52ebd4ecf1
commit 3cc9ca3988
6 changed files with 50 additions and 11 deletions

View File

@ -210,6 +210,7 @@ func UnsecuredKubeletConfig(s *options.KubeletServer) (*KubeletConfig, error) {
ConfigureCBR0: s.ConfigureCBR0, ConfigureCBR0: s.ConfigureCBR0,
ContainerManager: nil, ContainerManager: nil,
ContainerRuntime: s.ContainerRuntime, ContainerRuntime: s.ContainerRuntime,
RuntimeRequestTimeout: s.RuntimeRequestTimeout.Duration,
CPUCFSQuota: s.CPUCFSQuota, CPUCFSQuota: s.CPUCFSQuota,
DiskSpacePolicy: diskSpacePolicy, DiskSpacePolicy: diskSpacePolicy,
DockerClient: dockertools.ConnectToDockerOrDie(s.DockerEndpoint, s.RuntimeRequestTimeout.Duration), // TODO(random-liu): Set RuntimeRequestTimeout for rkt. DockerClient: dockertools.ConnectToDockerOrDie(s.DockerEndpoint, s.RuntimeRequestTimeout.Duration), // TODO(random-liu): Set RuntimeRequestTimeout for rkt.
@ -781,6 +782,7 @@ type KubeletConfig struct {
ConfigureCBR0 bool ConfigureCBR0 bool
ContainerManager cm.ContainerManager ContainerManager cm.ContainerManager
ContainerRuntime string ContainerRuntime string
RuntimeRequestTimeout time.Duration
CPUCFSQuota bool CPUCFSQuota bool
DiskSpacePolicy kubelet.DiskSpacePolicy DiskSpacePolicy kubelet.DiskSpacePolicy
DockerClient dockertools.DockerInterface DockerClient dockertools.DockerInterface
@ -921,6 +923,7 @@ func CreateAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.Pod
kc.OSInterface, kc.OSInterface,
kc.CgroupRoot, kc.CgroupRoot,
kc.ContainerRuntime, kc.ContainerRuntime,
kc.RuntimeRequestTimeout,
kc.RktPath, kc.RktPath,
kc.RktAPIEndpoint, kc.RktAPIEndpoint,
kc.RktStage1Image, kc.RktStage1Image,

View File

@ -206,6 +206,7 @@ func NewMainKubelet(
osInterface kubecontainer.OSInterface, osInterface kubecontainer.OSInterface,
cgroupRoot string, cgroupRoot string,
containerRuntime string, containerRuntime string,
runtimeRequestTimeout time.Duration,
rktPath string, rktPath string,
rktAPIEndpoint string, rktAPIEndpoint string,
rktStage1Image string, rktStage1Image string,
@ -451,6 +452,7 @@ func NewMainKubelet(
kubecontainer.RealOS{}, kubecontainer.RealOS{},
imageBackOff, imageBackOff,
serializeImagePulls, serializeImagePulls,
runtimeRequestTimeout,
) )
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -79,7 +79,9 @@ func (c *Config) buildGlobalOptions() []string {
// that the fields in the provided config will override the // that the fields in the provided config will override the
// result that get from the rkt api service. // result that get from the rkt api service.
func (r *Runtime) getConfig(cfg *Config) (*Config, error) { func (r *Runtime) getConfig(cfg *Config) (*Config, error) {
resp, err := r.apisvc.GetInfo(context.Background(), &rktapi.GetInfoRequest{}) ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout)
defer cancel()
resp, err := r.apisvc.GetInfo(ctx, &rktapi.GetInfoRequest{})
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -90,7 +90,9 @@ func (r *Runtime) IsImagePresent(image kubecontainer.ImageSpec) (bool, error) {
// ListImages lists all the available appc images on the machine by invoking 'rkt image list'. // ListImages lists all the available appc images on the machine by invoking 'rkt image list'.
func (r *Runtime) ListImages() ([]kubecontainer.Image, error) { func (r *Runtime) ListImages() ([]kubecontainer.Image, error) {
listResp, err := r.apisvc.ListImages(context.Background(), &rktapi.ListImagesRequest{}) ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout)
defer cancel()
listResp, err := r.apisvc.ListImages(ctx, &rktapi.ListImagesRequest{})
if err != nil { if err != nil {
return nil, fmt.Errorf("couldn't list images: %v", err) return nil, fmt.Errorf("couldn't list images: %v", err)
} }
@ -155,7 +157,9 @@ func (r *Runtime) listImages(image string, detail bool) ([]*rktapi.Image, error)
return nil, err return nil, err
} }
listResp, err := r.apisvc.ListImages(context.Background(), &rktapi.ListImagesRequest{ ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout)
defer cancel()
listResp, err := r.apisvc.ListImages(ctx, &rktapi.ListImagesRequest{
Detail: detail, Detail: detail,
Filters: []*rktapi.ImageFilter{ Filters: []*rktapi.ImageFilter{
{ {
@ -231,7 +235,9 @@ func (r *Runtime) writeDockerAuthConfig(image string, credsSlice []credentialpro
// ImageStats returns the image stat (total storage bytes). // ImageStats returns the image stat (total storage bytes).
func (r *Runtime) ImageStats() (*kubecontainer.ImageStats, error) { func (r *Runtime) ImageStats() (*kubecontainer.ImageStats, error) {
var imageStat kubecontainer.ImageStats var imageStat kubecontainer.ImageStats
listResp, err := r.apisvc.ListImages(context.Background(), &rktapi.ListImagesRequest{}) ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout)
defer cancel()
listResp, err := r.apisvc.ListImages(ctx, &rktapi.ListImagesRequest{})
if err != nil { if err != nil {
return nil, fmt.Errorf("couldn't list images: %v", err) return nil, fmt.Errorf("couldn't list images: %v", err)
} }

View File

@ -128,6 +128,9 @@ const (
// under the CNI directory directly. // under the CNI directory directly.
// See https://github.com/coreos/rkt/pull/2312#issuecomment-200068370. // See https://github.com/coreos/rkt/pull/2312#issuecomment-200068370.
defaultNetworkName = "rkt.kubernetes.io" defaultNetworkName = "rkt.kubernetes.io"
// defaultRequestTimeout is the default timeout of rkt requests.
defaultRequestTimeout = 2 * time.Minute
) )
// Runtime implements the Containerruntime for rkt. The implementation // Runtime implements the Containerruntime for rkt. The implementation
@ -166,6 +169,9 @@ type Runtime struct {
nsenterPath string nsenterPath string
versions versions versions versions
// requestTimeout is the timeout of rkt requests.
requestTimeout time.Duration
} }
var _ kubecontainer.Runtime = &Runtime{} var _ kubecontainer.Runtime = &Runtime{}
@ -200,6 +206,7 @@ func New(
os kubecontainer.OSInterface, os kubecontainer.OSInterface,
imageBackOff *flowcontrol.Backoff, imageBackOff *flowcontrol.Backoff,
serializeImagePulls bool, serializeImagePulls bool,
requestTimeout time.Duration,
) (*Runtime, error) { ) (*Runtime, error) {
// Create dbus connection. // Create dbus connection.
systemd, err := newSystemd() systemd, err := newSystemd()
@ -233,6 +240,10 @@ func New(
return nil, fmt.Errorf("cannot find nsenter binary: %v", err) return nil, fmt.Errorf("cannot find nsenter binary: %v", err)
} }
if requestTimeout == 0 {
requestTimeout = defaultRequestTimeout
}
rkt := &Runtime{ rkt := &Runtime{
os: kubecontainer.RealOS{}, os: kubecontainer.RealOS{},
systemd: systemd, systemd: systemd,
@ -249,6 +260,7 @@ func New(
execer: execer, execer: execer,
touchPath: touchPath, touchPath: touchPath,
nsenterPath: nsenterPath, nsenterPath: nsenterPath,
requestTimeout: requestTimeout,
} }
rkt.config, err = rkt.getConfig(rkt.config) rkt.config, err = rkt.getConfig(rkt.config)
@ -585,7 +597,9 @@ func setApp(imgManifest *appcschema.ImageManifest, c *api.Container, opts *kubec
func (r *Runtime) makePodManifest(pod *api.Pod, podIP string, pullSecrets []api.Secret) (*appcschema.PodManifest, error) { func (r *Runtime) makePodManifest(pod *api.Pod, podIP string, pullSecrets []api.Secret) (*appcschema.PodManifest, error) {
manifest := appcschema.BlankPodManifest() manifest := appcschema.BlankPodManifest()
listResp, err := r.apisvc.ListPods(context.Background(), &rktapi.ListPodsRequest{ ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout)
defer cancel()
listResp, err := r.apisvc.ListPods(ctx, &rktapi.ListPodsRequest{
Detail: true, Detail: true,
Filters: kubernetesPodFilters(pod.UID), Filters: kubernetesPodFilters(pod.UID),
}) })
@ -1349,7 +1363,9 @@ func (r *Runtime) runPostStartHook(containerID kubecontainer.ContainerID, pod *a
} }
isContainerRunning := func() (done bool, err error) { isContainerRunning := func() (done bool, err error) {
resp, err := r.apisvc.InspectPod(context.Background(), &rktapi.InspectPodRequest{Id: cid.uuid}) ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout)
defer cancel()
resp, err := r.apisvc.InspectPod(ctx, &rktapi.InspectPodRequest{Id: cid.uuid})
if err != nil { if err != nil {
return false, fmt.Errorf("failed to inspect rkt pod %q for pod %q", cid.uuid, format.Pod(pod)) return false, fmt.Errorf("failed to inspect rkt pod %q for pod %q", cid.uuid, format.Pod(pod))
} }
@ -1520,7 +1536,9 @@ func (r *Runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) {
if !all { if !all {
listReq.Filters[0].States = []rktapi.PodState{rktapi.PodState_POD_STATE_RUNNING} listReq.Filters[0].States = []rktapi.PodState{rktapi.PodState_POD_STATE_RUNNING}
} }
listResp, err := r.apisvc.ListPods(context.Background(), listReq) ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout)
defer cancel()
listResp, err := r.apisvc.ListPods(ctx, listReq)
if err != nil { if err != nil {
return nil, fmt.Errorf("couldn't list pods: %v", err) return nil, fmt.Errorf("couldn't list pods: %v", err)
} }
@ -1829,7 +1847,9 @@ func (r *Runtime) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy, allSo
return err return err
} }
resp, err := r.apisvc.ListPods(context.Background(), &rktapi.ListPodsRequest{Filters: kubernetesPodsFilters()}) ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout)
defer cancel()
resp, err := r.apisvc.ListPods(ctx, &rktapi.ListPodsRequest{Filters: kubernetesPodsFilters()})
if err != nil { if err != nil {
glog.Errorf("rkt: Failed to list pods: %v", err) glog.Errorf("rkt: Failed to list pods: %v", err)
return err return err
@ -2047,7 +2067,9 @@ func (r *Runtime) ExecInContainer(containerID kubecontainer.ContainerID, cmd []s
func (r *Runtime) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error { func (r *Runtime) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error {
glog.V(4).Infof("Rkt port forwarding in container.") glog.V(4).Infof("Rkt port forwarding in container.")
listResp, err := r.apisvc.ListPods(context.Background(), &rktapi.ListPodsRequest{ ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout)
defer cancel()
listResp, err := r.apisvc.ListPods(ctx, &rktapi.ListPodsRequest{
Detail: true, Detail: true,
Filters: runningKubernetesPodFilters(pod.ID), Filters: runningKubernetesPodFilters(pod.ID),
}) })
@ -2197,7 +2219,9 @@ func (r *Runtime) GetPodStatus(uid kubetypes.UID, name, namespace string) (*kube
Namespace: namespace, Namespace: namespace,
} }
listResp, err := r.apisvc.ListPods(context.Background(), &rktapi.ListPodsRequest{ ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout)
defer cancel()
listResp, err := r.apisvc.ListPods(ctx, &rktapi.ListPodsRequest{
Detail: true, Detail: true,
Filters: kubernetesPodFilters(uid), Filters: kubernetesPodFilters(uid),
}) })

View File

@ -73,9 +73,11 @@ func (r *Runtime) getVersions() error {
return err return err
} }
ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout)
defer cancel()
// Example for the version strings returned by GetInfo(): // Example for the version strings returned by GetInfo():
// RktVersion:"0.10.0+gitb7349b1" AppcVersion:"0.7.1" ApiVersion:"1.0.0-alpha" // RktVersion:"0.10.0+gitb7349b1" AppcVersion:"0.7.1" ApiVersion:"1.0.0-alpha"
resp, err := r.apisvc.GetInfo(context.Background(), &rktapi.GetInfoRequest{}) resp, err := r.apisvc.GetInfo(ctx, &rktapi.GetInfoRequest{})
if err != nil { if err != nil {
return err return err
} }