mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-06-29 00:37:24 +00:00
agent: add default timeout for grpc requests
If guest is malfunctioning, we need a way to bail out. Add a default timeout for most of the grpc requests so that the runtime does not wait indefinitely. Fixes: #1952 Signed-off-by: Peng Tao <bergwolf@hyper.sh>
This commit is contained in:
parent
9ea469bcfa
commit
debc7d93ad
@ -54,6 +54,7 @@ const (
|
|||||||
|
|
||||||
var (
|
var (
|
||||||
checkRequestTimeout = 30 * time.Second
|
checkRequestTimeout = 30 * time.Second
|
||||||
|
defaultRequestTimeout = 60 * time.Second
|
||||||
defaultKataSocketName = "kata.sock"
|
defaultKataSocketName = "kata.sock"
|
||||||
defaultKataChannel = "agent.channel.0"
|
defaultKataChannel = "agent.channel.0"
|
||||||
defaultKataDeviceID = "channel0"
|
defaultKataDeviceID = "channel0"
|
||||||
@ -98,6 +99,38 @@ const (
|
|||||||
defaultAgentTraceType = agentTraceTypeIsolated
|
defaultAgentTraceType = agentTraceTypeIsolated
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
grpcCheckRequest = "grpc.CheckRequest"
|
||||||
|
grpcExecProcessRequest = "grpc.ExecProcessRequest"
|
||||||
|
grpcCreateSandboxRequest = "grpc.CreateSandboxRequest"
|
||||||
|
grpcDestroySandboxRequest = "grpc.DestroySandboxRequest"
|
||||||
|
grpcCreateContainerRequest = "grpc.CreateContainerRequest"
|
||||||
|
grpcStartContainerRequest = "grpc.StartContainerRequest"
|
||||||
|
grpcRemoveContainerRequest = "grpc.RemoveContainerRequest"
|
||||||
|
grpcSignalProcessRequest = "grpc.SignalProcessRequest"
|
||||||
|
grpcUpdateRoutesRequest = "grpc.UpdateRoutesRequest"
|
||||||
|
grpcUpdateInterfaceRequest = "grpc.UpdateInterfaceRequest"
|
||||||
|
grpcListInterfacesRequest = "grpc.ListInterfacesRequest"
|
||||||
|
grpcListRoutesRequest = "grpc.ListRoutesRequest"
|
||||||
|
grpcOnlineCPUMemRequest = "grpc.OnlineCPUMemRequest"
|
||||||
|
grpcListProcessesRequest = "grpc.ListProcessesRequest"
|
||||||
|
grpcUpdateContainerRequest = "grpc.UpdateContainerRequest"
|
||||||
|
grpcWaitProcessRequest = "grpc.WaitProcessRequest"
|
||||||
|
grpcTtyWinResizeRequest = "grpc.TtyWinResizeRequest"
|
||||||
|
grpcWriteStreamRequest = "grpc.WriteStreamRequest"
|
||||||
|
grpcCloseStdinRequest = "grpc.CloseStdinRequest"
|
||||||
|
grpcStatsContainerRequest = "grpc.StatsContainerRequest"
|
||||||
|
grpcPauseContainerRequest = "grpc.PauseContainerRequest"
|
||||||
|
grpcResumeContainerRequest = "grpc.ResumeContainerRequest"
|
||||||
|
grpcReseedRandomDevRequest = "grpc.ReseedRandomDevRequest"
|
||||||
|
grpcGuestDetailsRequest = "grpc.GuestDetailsRequest"
|
||||||
|
grpcMemHotplugByProbeRequest = "grpc.MemHotplugByProbeRequest"
|
||||||
|
grpcCopyFileRequest = "grpc.CopyFileRequest"
|
||||||
|
grpcSetGuestDateTimeRequest = "grpc.SetGuestDateTimeRequest"
|
||||||
|
grpcStartTracingRequest = "grpc.StartTracingRequest"
|
||||||
|
grpcStopTracingRequest = "grpc.StopTracingRequest"
|
||||||
|
)
|
||||||
|
|
||||||
// KataAgentConfig is a structure storing information needed
|
// KataAgentConfig is a structure storing information needed
|
||||||
// to reach the Kata Containers agent.
|
// to reach the Kata Containers agent.
|
||||||
type KataAgentConfig struct {
|
type KataAgentConfig struct {
|
||||||
@ -1741,97 +1774,109 @@ type reqFunc func(context.Context, interface{}, ...golangGrpc.CallOption) (inter
|
|||||||
|
|
||||||
func (k *kataAgent) installReqFunc(c *kataclient.AgentClient) {
|
func (k *kataAgent) installReqFunc(c *kataclient.AgentClient) {
|
||||||
k.reqHandlers = make(map[string]reqFunc)
|
k.reqHandlers = make(map[string]reqFunc)
|
||||||
k.reqHandlers["grpc.CheckRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
k.reqHandlers[grpcCheckRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||||
ctx, cancel := context.WithTimeout(ctx, checkRequestTimeout)
|
|
||||||
defer cancel()
|
|
||||||
return k.client.Check(ctx, req.(*grpc.CheckRequest), opts...)
|
return k.client.Check(ctx, req.(*grpc.CheckRequest), opts...)
|
||||||
}
|
}
|
||||||
k.reqHandlers["grpc.ExecProcessRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
k.reqHandlers[grpcExecProcessRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||||
return k.client.ExecProcess(ctx, req.(*grpc.ExecProcessRequest), opts...)
|
return k.client.ExecProcess(ctx, req.(*grpc.ExecProcessRequest), opts...)
|
||||||
}
|
}
|
||||||
k.reqHandlers["grpc.CreateSandboxRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
k.reqHandlers[grpcCreateSandboxRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||||
return k.client.CreateSandbox(ctx, req.(*grpc.CreateSandboxRequest), opts...)
|
return k.client.CreateSandbox(ctx, req.(*grpc.CreateSandboxRequest), opts...)
|
||||||
}
|
}
|
||||||
k.reqHandlers["grpc.DestroySandboxRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
k.reqHandlers[grpcDestroySandboxRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||||
return k.client.DestroySandbox(ctx, req.(*grpc.DestroySandboxRequest), opts...)
|
return k.client.DestroySandbox(ctx, req.(*grpc.DestroySandboxRequest), opts...)
|
||||||
}
|
}
|
||||||
k.reqHandlers["grpc.CreateContainerRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
k.reqHandlers[grpcCreateContainerRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||||
return k.client.CreateContainer(ctx, req.(*grpc.CreateContainerRequest), opts...)
|
return k.client.CreateContainer(ctx, req.(*grpc.CreateContainerRequest), opts...)
|
||||||
}
|
}
|
||||||
k.reqHandlers["grpc.StartContainerRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
k.reqHandlers[grpcStartContainerRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||||
return k.client.StartContainer(ctx, req.(*grpc.StartContainerRequest), opts...)
|
return k.client.StartContainer(ctx, req.(*grpc.StartContainerRequest), opts...)
|
||||||
}
|
}
|
||||||
k.reqHandlers["grpc.RemoveContainerRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
k.reqHandlers[grpcRemoveContainerRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||||
return k.client.RemoveContainer(ctx, req.(*grpc.RemoveContainerRequest), opts...)
|
return k.client.RemoveContainer(ctx, req.(*grpc.RemoveContainerRequest), opts...)
|
||||||
}
|
}
|
||||||
k.reqHandlers["grpc.SignalProcessRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
k.reqHandlers[grpcSignalProcessRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||||
return k.client.SignalProcess(ctx, req.(*grpc.SignalProcessRequest), opts...)
|
return k.client.SignalProcess(ctx, req.(*grpc.SignalProcessRequest), opts...)
|
||||||
}
|
}
|
||||||
k.reqHandlers["grpc.UpdateRoutesRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
k.reqHandlers[grpcUpdateRoutesRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||||
return k.client.UpdateRoutes(ctx, req.(*grpc.UpdateRoutesRequest), opts...)
|
return k.client.UpdateRoutes(ctx, req.(*grpc.UpdateRoutesRequest), opts...)
|
||||||
}
|
}
|
||||||
k.reqHandlers["grpc.UpdateInterfaceRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
k.reqHandlers[grpcUpdateInterfaceRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||||
return k.client.UpdateInterface(ctx, req.(*grpc.UpdateInterfaceRequest), opts...)
|
return k.client.UpdateInterface(ctx, req.(*grpc.UpdateInterfaceRequest), opts...)
|
||||||
}
|
}
|
||||||
k.reqHandlers["grpc.ListInterfacesRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
k.reqHandlers[grpcListInterfacesRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||||
return k.client.ListInterfaces(ctx, req.(*grpc.ListInterfacesRequest), opts...)
|
return k.client.ListInterfaces(ctx, req.(*grpc.ListInterfacesRequest), opts...)
|
||||||
}
|
}
|
||||||
k.reqHandlers["grpc.ListRoutesRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
k.reqHandlers[grpcListRoutesRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||||
return k.client.ListRoutes(ctx, req.(*grpc.ListRoutesRequest), opts...)
|
return k.client.ListRoutes(ctx, req.(*grpc.ListRoutesRequest), opts...)
|
||||||
}
|
}
|
||||||
k.reqHandlers["grpc.OnlineCPUMemRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
k.reqHandlers[grpcOnlineCPUMemRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||||
return k.client.OnlineCPUMem(ctx, req.(*grpc.OnlineCPUMemRequest), opts...)
|
return k.client.OnlineCPUMem(ctx, req.(*grpc.OnlineCPUMemRequest), opts...)
|
||||||
}
|
}
|
||||||
k.reqHandlers["grpc.ListProcessesRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
k.reqHandlers[grpcListProcessesRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||||
return k.client.ListProcesses(ctx, req.(*grpc.ListProcessesRequest), opts...)
|
return k.client.ListProcesses(ctx, req.(*grpc.ListProcessesRequest), opts...)
|
||||||
}
|
}
|
||||||
k.reqHandlers["grpc.UpdateContainerRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
k.reqHandlers[grpcUpdateContainerRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||||
return k.client.UpdateContainer(ctx, req.(*grpc.UpdateContainerRequest), opts...)
|
return k.client.UpdateContainer(ctx, req.(*grpc.UpdateContainerRequest), opts...)
|
||||||
}
|
}
|
||||||
k.reqHandlers["grpc.WaitProcessRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
k.reqHandlers[grpcWaitProcessRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||||
return k.client.WaitProcess(ctx, req.(*grpc.WaitProcessRequest), opts...)
|
return k.client.WaitProcess(ctx, req.(*grpc.WaitProcessRequest), opts...)
|
||||||
}
|
}
|
||||||
k.reqHandlers["grpc.TtyWinResizeRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
k.reqHandlers[grpcTtyWinResizeRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||||
return k.client.TtyWinResize(ctx, req.(*grpc.TtyWinResizeRequest), opts...)
|
return k.client.TtyWinResize(ctx, req.(*grpc.TtyWinResizeRequest), opts...)
|
||||||
}
|
}
|
||||||
k.reqHandlers["grpc.WriteStreamRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
k.reqHandlers[grpcWriteStreamRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||||
return k.client.WriteStdin(ctx, req.(*grpc.WriteStreamRequest), opts...)
|
return k.client.WriteStdin(ctx, req.(*grpc.WriteStreamRequest), opts...)
|
||||||
}
|
}
|
||||||
k.reqHandlers["grpc.CloseStdinRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
k.reqHandlers[grpcCloseStdinRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||||
return k.client.CloseStdin(ctx, req.(*grpc.CloseStdinRequest), opts...)
|
return k.client.CloseStdin(ctx, req.(*grpc.CloseStdinRequest), opts...)
|
||||||
}
|
}
|
||||||
k.reqHandlers["grpc.StatsContainerRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
k.reqHandlers[grpcStatsContainerRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||||
return k.client.StatsContainer(ctx, req.(*grpc.StatsContainerRequest), opts...)
|
return k.client.StatsContainer(ctx, req.(*grpc.StatsContainerRequest), opts...)
|
||||||
}
|
}
|
||||||
k.reqHandlers["grpc.PauseContainerRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
k.reqHandlers[grpcPauseContainerRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||||
return k.client.PauseContainer(ctx, req.(*grpc.PauseContainerRequest), opts...)
|
return k.client.PauseContainer(ctx, req.(*grpc.PauseContainerRequest), opts...)
|
||||||
}
|
}
|
||||||
k.reqHandlers["grpc.ResumeContainerRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
k.reqHandlers[grpcResumeContainerRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||||
return k.client.ResumeContainer(ctx, req.(*grpc.ResumeContainerRequest), opts...)
|
return k.client.ResumeContainer(ctx, req.(*grpc.ResumeContainerRequest), opts...)
|
||||||
}
|
}
|
||||||
k.reqHandlers["grpc.ReseedRandomDevRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
k.reqHandlers[grpcReseedRandomDevRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||||
return k.client.ReseedRandomDev(ctx, req.(*grpc.ReseedRandomDevRequest), opts...)
|
return k.client.ReseedRandomDev(ctx, req.(*grpc.ReseedRandomDevRequest), opts...)
|
||||||
}
|
}
|
||||||
k.reqHandlers["grpc.GuestDetailsRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
k.reqHandlers[grpcGuestDetailsRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||||
return k.client.GetGuestDetails(ctx, req.(*grpc.GuestDetailsRequest), opts...)
|
return k.client.GetGuestDetails(ctx, req.(*grpc.GuestDetailsRequest), opts...)
|
||||||
}
|
}
|
||||||
k.reqHandlers["grpc.MemHotplugByProbeRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
k.reqHandlers[grpcMemHotplugByProbeRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||||
return k.client.MemHotplugByProbe(ctx, req.(*grpc.MemHotplugByProbeRequest), opts...)
|
return k.client.MemHotplugByProbe(ctx, req.(*grpc.MemHotplugByProbeRequest), opts...)
|
||||||
}
|
}
|
||||||
k.reqHandlers["grpc.CopyFileRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
k.reqHandlers[grpcCopyFileRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||||
return k.client.CopyFile(ctx, req.(*grpc.CopyFileRequest), opts...)
|
return k.client.CopyFile(ctx, req.(*grpc.CopyFileRequest), opts...)
|
||||||
}
|
}
|
||||||
k.reqHandlers["grpc.SetGuestDateTimeRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
k.reqHandlers[grpcSetGuestDateTimeRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||||
return k.client.SetGuestDateTime(ctx, req.(*grpc.SetGuestDateTimeRequest), opts...)
|
return k.client.SetGuestDateTime(ctx, req.(*grpc.SetGuestDateTimeRequest), opts...)
|
||||||
}
|
}
|
||||||
k.reqHandlers["grpc.StartTracingRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
k.reqHandlers[grpcStartTracingRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||||
return k.client.StartTracing(ctx, req.(*grpc.StartTracingRequest), opts...)
|
return k.client.StartTracing(ctx, req.(*grpc.StartTracingRequest), opts...)
|
||||||
}
|
}
|
||||||
k.reqHandlers["grpc.StopTracingRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
k.reqHandlers[grpcStopTracingRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||||
return k.client.StopTracing(ctx, req.(*grpc.StopTracingRequest), opts...)
|
return k.client.StopTracing(ctx, req.(*grpc.StopTracingRequest), opts...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (k *kataAgent) getReqContext(reqName string) (ctx context.Context, cancel context.CancelFunc) {
|
||||||
|
ctx = context.Background()
|
||||||
|
switch reqName {
|
||||||
|
case grpcWaitProcessRequest:
|
||||||
|
// Wait has no timeout
|
||||||
|
case grpcCheckRequest:
|
||||||
|
ctx, cancel = context.WithTimeout(ctx, checkRequestTimeout)
|
||||||
|
default:
|
||||||
|
ctx, cancel = context.WithTimeout(ctx, defaultRequestTimeout)
|
||||||
|
}
|
||||||
|
|
||||||
|
return ctx, cancel
|
||||||
|
}
|
||||||
|
|
||||||
func (k *kataAgent) sendReq(request interface{}) (interface{}, error) {
|
func (k *kataAgent) sendReq(request interface{}) (interface{}, error) {
|
||||||
span, _ := k.trace("sendReq")
|
span, _ := k.trace("sendReq")
|
||||||
span.SetTag("request", request)
|
span.SetTag("request", request)
|
||||||
@ -1850,9 +1895,13 @@ func (k *kataAgent) sendReq(request interface{}) (interface{}, error) {
|
|||||||
return nil, errors.New("Invalid request type")
|
return nil, errors.New("Invalid request type")
|
||||||
}
|
}
|
||||||
message := request.(proto.Message)
|
message := request.(proto.Message)
|
||||||
|
ctx, cancel := k.getReqContext(msgName)
|
||||||
|
if cancel != nil {
|
||||||
|
defer cancel()
|
||||||
|
}
|
||||||
k.Logger().WithField("name", msgName).WithField("req", message.String()).Debug("sending request")
|
k.Logger().WithField("name", msgName).WithField("req", message.String()).Debug("sending request")
|
||||||
|
|
||||||
return handler(k.ctx, request)
|
return handler(ctx, request)
|
||||||
}
|
}
|
||||||
|
|
||||||
// readStdout and readStderr are special that we cannot differentiate them with the request types...
|
// readStdout and readStderr are special that we cannot differentiate them with the request types...
|
||||||
|
Loading…
Reference in New Issue
Block a user