mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-09-18 07:18:27 +00:00
shimv2: fix the issue ttrpc server canceled context
This latest ttrpc vendor supports the feature of request timeout propgation. this feature will do context cancel after a service call return, and this cancel will propagated into kata sandbox's agent/hypervisor and resulted in the following calls canceled. To fix this issue, pass the service's context instead of the service's call's context to CreateSandbox(), and this context will live until the shim exited. Fixes:#1627 Signed-off-by: lifupan <lifupan@gmail.com>
This commit is contained in:
@@ -80,7 +80,12 @@ func create(ctx context.Context, s *service, r *taskAPI.CreateTaskRequest, netns
|
||||
rootFs.Mounted = s.mount
|
||||
|
||||
katautils.HandleFactory(ctx, vci, s.config)
|
||||
sandbox, _, err := katautils.CreateSandbox(ctx, vci, *ociSpec, *s.config, rootFs, r.ID, bundlePath, "", disableOutput, false, true)
|
||||
|
||||
// Pass service's context instead of local ctx to CreateSandbox(), since local
|
||||
// ctx will be canceled after this rpc service call, but the sandbox will live
|
||||
// across multiple rpc service calls.
|
||||
//
|
||||
sandbox, _, err := katautils.CreateSandbox(s.ctx, vci, *ociSpec, *s.config, rootFs, r.ID, bundlePath, "", disableOutput, false, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@@ -84,6 +84,7 @@ func TestCreateSandboxSuccess(t *testing.T) {
|
||||
id: testSandboxID,
|
||||
containers: make(map[string]*container),
|
||||
config: &runtimeConfig,
|
||||
ctx: context.Background(),
|
||||
}
|
||||
|
||||
req := &taskAPI.CreateTaskRequest{
|
||||
@@ -129,6 +130,7 @@ func TestCreateSandboxFail(t *testing.T) {
|
||||
id: testSandboxID,
|
||||
containers: make(map[string]*container),
|
||||
config: &runtimeConfig,
|
||||
ctx: context.Background(),
|
||||
}
|
||||
|
||||
req := &taskAPI.CreateTaskRequest{
|
||||
@@ -184,6 +186,7 @@ func TestCreateSandboxConfigFail(t *testing.T) {
|
||||
id: testSandboxID,
|
||||
containers: make(map[string]*container),
|
||||
config: &runtimeConfig,
|
||||
ctx: context.Background(),
|
||||
}
|
||||
|
||||
req := &taskAPI.CreateTaskRequest{
|
||||
@@ -245,6 +248,7 @@ func TestCreateContainerSuccess(t *testing.T) {
|
||||
sandbox: sandbox,
|
||||
containers: make(map[string]*container),
|
||||
config: &runtimeConfig,
|
||||
ctx: context.Background(),
|
||||
}
|
||||
|
||||
req := &taskAPI.CreateTaskRequest{
|
||||
@@ -291,6 +295,7 @@ func TestCreateContainerFail(t *testing.T) {
|
||||
id: testContainerID,
|
||||
containers: make(map[string]*container),
|
||||
config: &runtimeConfig,
|
||||
ctx: context.Background(),
|
||||
}
|
||||
|
||||
req := &taskAPI.CreateTaskRequest{
|
||||
@@ -351,6 +356,7 @@ func TestCreateContainerConfigFail(t *testing.T) {
|
||||
sandbox: sandbox,
|
||||
containers: make(map[string]*container),
|
||||
config: &runtimeConfig,
|
||||
ctx: context.Background(),
|
||||
}
|
||||
|
||||
req := &taskAPI.CreateTaskRequest{
|
||||
|
@@ -43,6 +43,10 @@ const (
|
||||
|
||||
chSize = 128
|
||||
exitCode255 = 255
|
||||
|
||||
// A time span used to wait for publish a containerd event,
|
||||
// once it costs a longer time than timeOut, it will be canceld.
|
||||
timeOut = 5 * time.Second
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -63,13 +67,16 @@ func New(ctx context.Context, id string, publisher events.Publisher) (cdshim.Shi
|
||||
vci.SetLogger(ctx, logger)
|
||||
katautils.SetLogger(ctx, logger, logger.Logger.Level)
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
s := &service{
|
||||
id: id,
|
||||
pid: uint32(os.Getpid()),
|
||||
context: ctx,
|
||||
ctx: ctx,
|
||||
containers: make(map[string]*container),
|
||||
events: make(chan interface{}, chSize),
|
||||
ec: make(chan exit, bufferSize),
|
||||
cancel: cancel,
|
||||
mount: false,
|
||||
}
|
||||
|
||||
@@ -102,12 +109,14 @@ type service struct {
|
||||
// will not do the rootfs mount.
|
||||
mount bool
|
||||
|
||||
context context.Context
|
||||
ctx context.Context
|
||||
sandbox vc.VCSandbox
|
||||
containers map[string]*container
|
||||
config *oci.RuntimeConfig
|
||||
events chan interface{}
|
||||
|
||||
cancel func()
|
||||
|
||||
ec chan exit
|
||||
id string
|
||||
}
|
||||
@@ -209,7 +218,10 @@ func (s *service) StartShim(ctx context.Context, id, containerdBinary, container
|
||||
|
||||
func (s *service) forward(publisher events.Publisher) {
|
||||
for e := range s.events {
|
||||
if err := publisher.Publish(s.context, getTopic(s.context, e), e); err != nil {
|
||||
ctx, cancel := context.WithTimeout(s.ctx, timeOut)
|
||||
err := publisher.Publish(ctx, getTopic(e), e)
|
||||
cancel()
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("post event")
|
||||
}
|
||||
}
|
||||
@@ -230,7 +242,7 @@ func (s *service) sendL(evt interface{}) {
|
||||
s.eventSendMu.Unlock()
|
||||
}
|
||||
|
||||
func getTopic(ctx context.Context, e interface{}) string {
|
||||
func getTopic(e interface{}) string {
|
||||
switch e.(type) {
|
||||
case *eventstypes.TaskCreate:
|
||||
return cdruntime.TaskCreateEventTopic
|
||||
@@ -766,6 +778,8 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (_ *
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
s.cancel()
|
||||
|
||||
os.Exit(0)
|
||||
|
||||
// This will never be called, but this is only there to make sure the
|
||||
|
Reference in New Issue
Block a user