From debc7d93ad623f7475a41ddef11bcfcd3560bb65 Mon Sep 17 00:00:00 2001 From: Peng Tao Date: Wed, 7 Aug 2019 11:41:08 +0800 Subject: [PATCH] agent: add default timeout for grpc requests If guest is malfunctioning, we need a way to bail out. Add a default timeout for most of the grpc requests so that the runtime does not wait indefinitely. Fixes: #1952 Signed-off-by: Peng Tao --- virtcontainers/kata_agent.go | 113 +++++++++++++++++++++++++---------- 1 file changed, 81 insertions(+), 32 deletions(-) diff --git a/virtcontainers/kata_agent.go b/virtcontainers/kata_agent.go index f9443b5b0d..254b8211ae 100644 --- a/virtcontainers/kata_agent.go +++ b/virtcontainers/kata_agent.go @@ -54,6 +54,7 @@ const ( var ( checkRequestTimeout = 30 * time.Second + defaultRequestTimeout = 60 * time.Second defaultKataSocketName = "kata.sock" defaultKataChannel = "agent.channel.0" defaultKataDeviceID = "channel0" @@ -98,6 +99,38 @@ const ( defaultAgentTraceType = agentTraceTypeIsolated ) +const ( + grpcCheckRequest = "grpc.CheckRequest" + grpcExecProcessRequest = "grpc.ExecProcessRequest" + grpcCreateSandboxRequest = "grpc.CreateSandboxRequest" + grpcDestroySandboxRequest = "grpc.DestroySandboxRequest" + grpcCreateContainerRequest = "grpc.CreateContainerRequest" + grpcStartContainerRequest = "grpc.StartContainerRequest" + grpcRemoveContainerRequest = "grpc.RemoveContainerRequest" + grpcSignalProcessRequest = "grpc.SignalProcessRequest" + grpcUpdateRoutesRequest = "grpc.UpdateRoutesRequest" + grpcUpdateInterfaceRequest = "grpc.UpdateInterfaceRequest" + grpcListInterfacesRequest = "grpc.ListInterfacesRequest" + grpcListRoutesRequest = "grpc.ListRoutesRequest" + grpcOnlineCPUMemRequest = "grpc.OnlineCPUMemRequest" + grpcListProcessesRequest = "grpc.ListProcessesRequest" + grpcUpdateContainerRequest = "grpc.UpdateContainerRequest" + grpcWaitProcessRequest = "grpc.WaitProcessRequest" + grpcTtyWinResizeRequest = "grpc.TtyWinResizeRequest" + grpcWriteStreamRequest = "grpc.WriteStreamRequest" + grpcCloseStdinRequest = "grpc.CloseStdinRequest" + grpcStatsContainerRequest = "grpc.StatsContainerRequest" + grpcPauseContainerRequest = "grpc.PauseContainerRequest" + grpcResumeContainerRequest = "grpc.ResumeContainerRequest" + grpcReseedRandomDevRequest = "grpc.ReseedRandomDevRequest" + grpcGuestDetailsRequest = "grpc.GuestDetailsRequest" + grpcMemHotplugByProbeRequest = "grpc.MemHotplugByProbeRequest" + grpcCopyFileRequest = "grpc.CopyFileRequest" + grpcSetGuestDateTimeRequest = "grpc.SetGuestDateTimeRequest" + grpcStartTracingRequest = "grpc.StartTracingRequest" + grpcStopTracingRequest = "grpc.StopTracingRequest" +) + // KataAgentConfig is a structure storing information needed // to reach the Kata Containers agent. type KataAgentConfig struct { @@ -1741,97 +1774,109 @@ type reqFunc func(context.Context, interface{}, ...golangGrpc.CallOption) (inter func (k *kataAgent) installReqFunc(c *kataclient.AgentClient) { k.reqHandlers = make(map[string]reqFunc) - k.reqHandlers["grpc.CheckRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { - ctx, cancel := context.WithTimeout(ctx, checkRequestTimeout) - defer cancel() + k.reqHandlers[grpcCheckRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { return k.client.Check(ctx, req.(*grpc.CheckRequest), opts...) } - k.reqHandlers["grpc.ExecProcessRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + k.reqHandlers[grpcExecProcessRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { return k.client.ExecProcess(ctx, req.(*grpc.ExecProcessRequest), opts...) } - k.reqHandlers["grpc.CreateSandboxRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + k.reqHandlers[grpcCreateSandboxRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { return k.client.CreateSandbox(ctx, req.(*grpc.CreateSandboxRequest), opts...) } - k.reqHandlers["grpc.DestroySandboxRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + k.reqHandlers[grpcDestroySandboxRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { return k.client.DestroySandbox(ctx, req.(*grpc.DestroySandboxRequest), opts...) } - k.reqHandlers["grpc.CreateContainerRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + k.reqHandlers[grpcCreateContainerRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { return k.client.CreateContainer(ctx, req.(*grpc.CreateContainerRequest), opts...) } - k.reqHandlers["grpc.StartContainerRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + k.reqHandlers[grpcStartContainerRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { return k.client.StartContainer(ctx, req.(*grpc.StartContainerRequest), opts...) } - k.reqHandlers["grpc.RemoveContainerRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + k.reqHandlers[grpcRemoveContainerRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { return k.client.RemoveContainer(ctx, req.(*grpc.RemoveContainerRequest), opts...) } - k.reqHandlers["grpc.SignalProcessRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + k.reqHandlers[grpcSignalProcessRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { return k.client.SignalProcess(ctx, req.(*grpc.SignalProcessRequest), opts...) } - k.reqHandlers["grpc.UpdateRoutesRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + k.reqHandlers[grpcUpdateRoutesRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { return k.client.UpdateRoutes(ctx, req.(*grpc.UpdateRoutesRequest), opts...) } - k.reqHandlers["grpc.UpdateInterfaceRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + k.reqHandlers[grpcUpdateInterfaceRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { return k.client.UpdateInterface(ctx, req.(*grpc.UpdateInterfaceRequest), opts...) } - k.reqHandlers["grpc.ListInterfacesRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + k.reqHandlers[grpcListInterfacesRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { return k.client.ListInterfaces(ctx, req.(*grpc.ListInterfacesRequest), opts...) } - k.reqHandlers["grpc.ListRoutesRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + k.reqHandlers[grpcListRoutesRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { return k.client.ListRoutes(ctx, req.(*grpc.ListRoutesRequest), opts...) } - k.reqHandlers["grpc.OnlineCPUMemRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + k.reqHandlers[grpcOnlineCPUMemRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { return k.client.OnlineCPUMem(ctx, req.(*grpc.OnlineCPUMemRequest), opts...) } - k.reqHandlers["grpc.ListProcessesRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + k.reqHandlers[grpcListProcessesRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { return k.client.ListProcesses(ctx, req.(*grpc.ListProcessesRequest), opts...) } - k.reqHandlers["grpc.UpdateContainerRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + k.reqHandlers[grpcUpdateContainerRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { return k.client.UpdateContainer(ctx, req.(*grpc.UpdateContainerRequest), opts...) } - k.reqHandlers["grpc.WaitProcessRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + k.reqHandlers[grpcWaitProcessRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { return k.client.WaitProcess(ctx, req.(*grpc.WaitProcessRequest), opts...) } - k.reqHandlers["grpc.TtyWinResizeRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + k.reqHandlers[grpcTtyWinResizeRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { return k.client.TtyWinResize(ctx, req.(*grpc.TtyWinResizeRequest), opts...) } - k.reqHandlers["grpc.WriteStreamRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + k.reqHandlers[grpcWriteStreamRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { return k.client.WriteStdin(ctx, req.(*grpc.WriteStreamRequest), opts...) } - k.reqHandlers["grpc.CloseStdinRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + k.reqHandlers[grpcCloseStdinRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { return k.client.CloseStdin(ctx, req.(*grpc.CloseStdinRequest), opts...) } - k.reqHandlers["grpc.StatsContainerRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + k.reqHandlers[grpcStatsContainerRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { return k.client.StatsContainer(ctx, req.(*grpc.StatsContainerRequest), opts...) } - k.reqHandlers["grpc.PauseContainerRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + k.reqHandlers[grpcPauseContainerRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { return k.client.PauseContainer(ctx, req.(*grpc.PauseContainerRequest), opts...) } - k.reqHandlers["grpc.ResumeContainerRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + k.reqHandlers[grpcResumeContainerRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { return k.client.ResumeContainer(ctx, req.(*grpc.ResumeContainerRequest), opts...) } - k.reqHandlers["grpc.ReseedRandomDevRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + k.reqHandlers[grpcReseedRandomDevRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { return k.client.ReseedRandomDev(ctx, req.(*grpc.ReseedRandomDevRequest), opts...) } - k.reqHandlers["grpc.GuestDetailsRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + k.reqHandlers[grpcGuestDetailsRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { return k.client.GetGuestDetails(ctx, req.(*grpc.GuestDetailsRequest), opts...) } - k.reqHandlers["grpc.MemHotplugByProbeRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + k.reqHandlers[grpcMemHotplugByProbeRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { return k.client.MemHotplugByProbe(ctx, req.(*grpc.MemHotplugByProbeRequest), opts...) } - k.reqHandlers["grpc.CopyFileRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + k.reqHandlers[grpcCopyFileRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { return k.client.CopyFile(ctx, req.(*grpc.CopyFileRequest), opts...) } - k.reqHandlers["grpc.SetGuestDateTimeRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + k.reqHandlers[grpcSetGuestDateTimeRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { return k.client.SetGuestDateTime(ctx, req.(*grpc.SetGuestDateTimeRequest), opts...) } - k.reqHandlers["grpc.StartTracingRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + k.reqHandlers[grpcStartTracingRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { return k.client.StartTracing(ctx, req.(*grpc.StartTracingRequest), opts...) } - k.reqHandlers["grpc.StopTracingRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + k.reqHandlers[grpcStopTracingRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { return k.client.StopTracing(ctx, req.(*grpc.StopTracingRequest), opts...) } } +func (k *kataAgent) getReqContext(reqName string) (ctx context.Context, cancel context.CancelFunc) { + ctx = context.Background() + switch reqName { + case grpcWaitProcessRequest: + // Wait has no timeout + case grpcCheckRequest: + ctx, cancel = context.WithTimeout(ctx, checkRequestTimeout) + default: + ctx, cancel = context.WithTimeout(ctx, defaultRequestTimeout) + } + + return ctx, cancel +} + func (k *kataAgent) sendReq(request interface{}) (interface{}, error) { span, _ := k.trace("sendReq") span.SetTag("request", request) @@ -1850,9 +1895,13 @@ func (k *kataAgent) sendReq(request interface{}) (interface{}, error) { return nil, errors.New("Invalid request type") } message := request.(proto.Message) + ctx, cancel := k.getReqContext(msgName) + if cancel != nil { + defer cancel() + } k.Logger().WithField("name", msgName).WithField("req", message.String()).Debug("sending request") - return handler(k.ctx, request) + return handler(ctx, request) } // readStdout and readStderr are special that we cannot differentiate them with the request types...