mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-06-28 16:27:50 +00:00
agent: add default timeout for grpc requests
If guest is malfunctioning, we need a way to bail out. Add a default timeout for most of the grpc requests so that the runtime does not wait indefinitely. Fixes: #1952 Signed-off-by: Peng Tao <bergwolf@hyper.sh>
This commit is contained in:
parent
9ea469bcfa
commit
debc7d93ad
@ -54,6 +54,7 @@ const (
|
||||
|
||||
var (
|
||||
checkRequestTimeout = 30 * time.Second
|
||||
defaultRequestTimeout = 60 * time.Second
|
||||
defaultKataSocketName = "kata.sock"
|
||||
defaultKataChannel = "agent.channel.0"
|
||||
defaultKataDeviceID = "channel0"
|
||||
@ -98,6 +99,38 @@ const (
|
||||
defaultAgentTraceType = agentTraceTypeIsolated
|
||||
)
|
||||
|
||||
const (
|
||||
grpcCheckRequest = "grpc.CheckRequest"
|
||||
grpcExecProcessRequest = "grpc.ExecProcessRequest"
|
||||
grpcCreateSandboxRequest = "grpc.CreateSandboxRequest"
|
||||
grpcDestroySandboxRequest = "grpc.DestroySandboxRequest"
|
||||
grpcCreateContainerRequest = "grpc.CreateContainerRequest"
|
||||
grpcStartContainerRequest = "grpc.StartContainerRequest"
|
||||
grpcRemoveContainerRequest = "grpc.RemoveContainerRequest"
|
||||
grpcSignalProcessRequest = "grpc.SignalProcessRequest"
|
||||
grpcUpdateRoutesRequest = "grpc.UpdateRoutesRequest"
|
||||
grpcUpdateInterfaceRequest = "grpc.UpdateInterfaceRequest"
|
||||
grpcListInterfacesRequest = "grpc.ListInterfacesRequest"
|
||||
grpcListRoutesRequest = "grpc.ListRoutesRequest"
|
||||
grpcOnlineCPUMemRequest = "grpc.OnlineCPUMemRequest"
|
||||
grpcListProcessesRequest = "grpc.ListProcessesRequest"
|
||||
grpcUpdateContainerRequest = "grpc.UpdateContainerRequest"
|
||||
grpcWaitProcessRequest = "grpc.WaitProcessRequest"
|
||||
grpcTtyWinResizeRequest = "grpc.TtyWinResizeRequest"
|
||||
grpcWriteStreamRequest = "grpc.WriteStreamRequest"
|
||||
grpcCloseStdinRequest = "grpc.CloseStdinRequest"
|
||||
grpcStatsContainerRequest = "grpc.StatsContainerRequest"
|
||||
grpcPauseContainerRequest = "grpc.PauseContainerRequest"
|
||||
grpcResumeContainerRequest = "grpc.ResumeContainerRequest"
|
||||
grpcReseedRandomDevRequest = "grpc.ReseedRandomDevRequest"
|
||||
grpcGuestDetailsRequest = "grpc.GuestDetailsRequest"
|
||||
grpcMemHotplugByProbeRequest = "grpc.MemHotplugByProbeRequest"
|
||||
grpcCopyFileRequest = "grpc.CopyFileRequest"
|
||||
grpcSetGuestDateTimeRequest = "grpc.SetGuestDateTimeRequest"
|
||||
grpcStartTracingRequest = "grpc.StartTracingRequest"
|
||||
grpcStopTracingRequest = "grpc.StopTracingRequest"
|
||||
)
|
||||
|
||||
// KataAgentConfig is a structure storing information needed
|
||||
// to reach the Kata Containers agent.
|
||||
type KataAgentConfig struct {
|
||||
@ -1741,97 +1774,109 @@ type reqFunc func(context.Context, interface{}, ...golangGrpc.CallOption) (inter
|
||||
|
||||
func (k *kataAgent) installReqFunc(c *kataclient.AgentClient) {
|
||||
k.reqHandlers = make(map[string]reqFunc)
|
||||
k.reqHandlers["grpc.CheckRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
ctx, cancel := context.WithTimeout(ctx, checkRequestTimeout)
|
||||
defer cancel()
|
||||
k.reqHandlers[grpcCheckRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
return k.client.Check(ctx, req.(*grpc.CheckRequest), opts...)
|
||||
}
|
||||
k.reqHandlers["grpc.ExecProcessRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
k.reqHandlers[grpcExecProcessRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
return k.client.ExecProcess(ctx, req.(*grpc.ExecProcessRequest), opts...)
|
||||
}
|
||||
k.reqHandlers["grpc.CreateSandboxRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
k.reqHandlers[grpcCreateSandboxRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
return k.client.CreateSandbox(ctx, req.(*grpc.CreateSandboxRequest), opts...)
|
||||
}
|
||||
k.reqHandlers["grpc.DestroySandboxRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
k.reqHandlers[grpcDestroySandboxRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
return k.client.DestroySandbox(ctx, req.(*grpc.DestroySandboxRequest), opts...)
|
||||
}
|
||||
k.reqHandlers["grpc.CreateContainerRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
k.reqHandlers[grpcCreateContainerRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
return k.client.CreateContainer(ctx, req.(*grpc.CreateContainerRequest), opts...)
|
||||
}
|
||||
k.reqHandlers["grpc.StartContainerRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
k.reqHandlers[grpcStartContainerRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
return k.client.StartContainer(ctx, req.(*grpc.StartContainerRequest), opts...)
|
||||
}
|
||||
k.reqHandlers["grpc.RemoveContainerRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
k.reqHandlers[grpcRemoveContainerRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
return k.client.RemoveContainer(ctx, req.(*grpc.RemoveContainerRequest), opts...)
|
||||
}
|
||||
k.reqHandlers["grpc.SignalProcessRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
k.reqHandlers[grpcSignalProcessRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
return k.client.SignalProcess(ctx, req.(*grpc.SignalProcessRequest), opts...)
|
||||
}
|
||||
k.reqHandlers["grpc.UpdateRoutesRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
k.reqHandlers[grpcUpdateRoutesRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
return k.client.UpdateRoutes(ctx, req.(*grpc.UpdateRoutesRequest), opts...)
|
||||
}
|
||||
k.reqHandlers["grpc.UpdateInterfaceRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
k.reqHandlers[grpcUpdateInterfaceRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
return k.client.UpdateInterface(ctx, req.(*grpc.UpdateInterfaceRequest), opts...)
|
||||
}
|
||||
k.reqHandlers["grpc.ListInterfacesRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
k.reqHandlers[grpcListInterfacesRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
return k.client.ListInterfaces(ctx, req.(*grpc.ListInterfacesRequest), opts...)
|
||||
}
|
||||
k.reqHandlers["grpc.ListRoutesRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
k.reqHandlers[grpcListRoutesRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
return k.client.ListRoutes(ctx, req.(*grpc.ListRoutesRequest), opts...)
|
||||
}
|
||||
k.reqHandlers["grpc.OnlineCPUMemRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
k.reqHandlers[grpcOnlineCPUMemRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
return k.client.OnlineCPUMem(ctx, req.(*grpc.OnlineCPUMemRequest), opts...)
|
||||
}
|
||||
k.reqHandlers["grpc.ListProcessesRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
k.reqHandlers[grpcListProcessesRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
return k.client.ListProcesses(ctx, req.(*grpc.ListProcessesRequest), opts...)
|
||||
}
|
||||
k.reqHandlers["grpc.UpdateContainerRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
k.reqHandlers[grpcUpdateContainerRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
return k.client.UpdateContainer(ctx, req.(*grpc.UpdateContainerRequest), opts...)
|
||||
}
|
||||
k.reqHandlers["grpc.WaitProcessRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
k.reqHandlers[grpcWaitProcessRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
return k.client.WaitProcess(ctx, req.(*grpc.WaitProcessRequest), opts...)
|
||||
}
|
||||
k.reqHandlers["grpc.TtyWinResizeRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
k.reqHandlers[grpcTtyWinResizeRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
return k.client.TtyWinResize(ctx, req.(*grpc.TtyWinResizeRequest), opts...)
|
||||
}
|
||||
k.reqHandlers["grpc.WriteStreamRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
k.reqHandlers[grpcWriteStreamRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
return k.client.WriteStdin(ctx, req.(*grpc.WriteStreamRequest), opts...)
|
||||
}
|
||||
k.reqHandlers["grpc.CloseStdinRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
k.reqHandlers[grpcCloseStdinRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
return k.client.CloseStdin(ctx, req.(*grpc.CloseStdinRequest), opts...)
|
||||
}
|
||||
k.reqHandlers["grpc.StatsContainerRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
k.reqHandlers[grpcStatsContainerRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
return k.client.StatsContainer(ctx, req.(*grpc.StatsContainerRequest), opts...)
|
||||
}
|
||||
k.reqHandlers["grpc.PauseContainerRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
k.reqHandlers[grpcPauseContainerRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
return k.client.PauseContainer(ctx, req.(*grpc.PauseContainerRequest), opts...)
|
||||
}
|
||||
k.reqHandlers["grpc.ResumeContainerRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
k.reqHandlers[grpcResumeContainerRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
return k.client.ResumeContainer(ctx, req.(*grpc.ResumeContainerRequest), opts...)
|
||||
}
|
||||
k.reqHandlers["grpc.ReseedRandomDevRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
k.reqHandlers[grpcReseedRandomDevRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
return k.client.ReseedRandomDev(ctx, req.(*grpc.ReseedRandomDevRequest), opts...)
|
||||
}
|
||||
k.reqHandlers["grpc.GuestDetailsRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
k.reqHandlers[grpcGuestDetailsRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
return k.client.GetGuestDetails(ctx, req.(*grpc.GuestDetailsRequest), opts...)
|
||||
}
|
||||
k.reqHandlers["grpc.MemHotplugByProbeRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
k.reqHandlers[grpcMemHotplugByProbeRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
return k.client.MemHotplugByProbe(ctx, req.(*grpc.MemHotplugByProbeRequest), opts...)
|
||||
}
|
||||
k.reqHandlers["grpc.CopyFileRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
k.reqHandlers[grpcCopyFileRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
return k.client.CopyFile(ctx, req.(*grpc.CopyFileRequest), opts...)
|
||||
}
|
||||
k.reqHandlers["grpc.SetGuestDateTimeRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
k.reqHandlers[grpcSetGuestDateTimeRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
return k.client.SetGuestDateTime(ctx, req.(*grpc.SetGuestDateTimeRequest), opts...)
|
||||
}
|
||||
k.reqHandlers["grpc.StartTracingRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
k.reqHandlers[grpcStartTracingRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
return k.client.StartTracing(ctx, req.(*grpc.StartTracingRequest), opts...)
|
||||
}
|
||||
k.reqHandlers["grpc.StopTracingRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
k.reqHandlers[grpcStopTracingRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
return k.client.StopTracing(ctx, req.(*grpc.StopTracingRequest), opts...)
|
||||
}
|
||||
}
|
||||
|
||||
func (k *kataAgent) getReqContext(reqName string) (ctx context.Context, cancel context.CancelFunc) {
|
||||
ctx = context.Background()
|
||||
switch reqName {
|
||||
case grpcWaitProcessRequest:
|
||||
// Wait has no timeout
|
||||
case grpcCheckRequest:
|
||||
ctx, cancel = context.WithTimeout(ctx, checkRequestTimeout)
|
||||
default:
|
||||
ctx, cancel = context.WithTimeout(ctx, defaultRequestTimeout)
|
||||
}
|
||||
|
||||
return ctx, cancel
|
||||
}
|
||||
|
||||
func (k *kataAgent) sendReq(request interface{}) (interface{}, error) {
|
||||
span, _ := k.trace("sendReq")
|
||||
span.SetTag("request", request)
|
||||
@ -1850,9 +1895,13 @@ func (k *kataAgent) sendReq(request interface{}) (interface{}, error) {
|
||||
return nil, errors.New("Invalid request type")
|
||||
}
|
||||
message := request.(proto.Message)
|
||||
ctx, cancel := k.getReqContext(msgName)
|
||||
if cancel != nil {
|
||||
defer cancel()
|
||||
}
|
||||
k.Logger().WithField("name", msgName).WithField("req", message.String()).Debug("sending request")
|
||||
|
||||
return handler(k.ctx, request)
|
||||
return handler(ctx, request)
|
||||
}
|
||||
|
||||
// readStdout and readStderr are special that we cannot differentiate them with the request types...
|
||||
|
Loading…
Reference in New Issue
Block a user