mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-09-24 10:17:21 +00:00
Merge pull request #1628 from lifupan/fixctx
shimv2: fix the issue ttrpc server canceled context
This commit is contained in:
@@ -80,7 +80,12 @@ func create(ctx context.Context, s *service, r *taskAPI.CreateTaskRequest, netns
|
|||||||
rootFs.Mounted = s.mount
|
rootFs.Mounted = s.mount
|
||||||
|
|
||||||
katautils.HandleFactory(ctx, vci, s.config)
|
katautils.HandleFactory(ctx, vci, s.config)
|
||||||
sandbox, _, err := katautils.CreateSandbox(ctx, vci, *ociSpec, *s.config, rootFs, r.ID, bundlePath, "", disableOutput, false, true)
|
|
||||||
|
// Pass service's context instead of local ctx to CreateSandbox(), since local
|
||||||
|
// ctx will be canceled after this rpc service call, but the sandbox will live
|
||||||
|
// across multiple rpc service calls.
|
||||||
|
//
|
||||||
|
sandbox, _, err := katautils.CreateSandbox(s.ctx, vci, *ociSpec, *s.config, rootFs, r.ID, bundlePath, "", disableOutput, false, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@@ -84,6 +84,7 @@ func TestCreateSandboxSuccess(t *testing.T) {
|
|||||||
id: testSandboxID,
|
id: testSandboxID,
|
||||||
containers: make(map[string]*container),
|
containers: make(map[string]*container),
|
||||||
config: &runtimeConfig,
|
config: &runtimeConfig,
|
||||||
|
ctx: context.Background(),
|
||||||
}
|
}
|
||||||
|
|
||||||
req := &taskAPI.CreateTaskRequest{
|
req := &taskAPI.CreateTaskRequest{
|
||||||
@@ -129,6 +130,7 @@ func TestCreateSandboxFail(t *testing.T) {
|
|||||||
id: testSandboxID,
|
id: testSandboxID,
|
||||||
containers: make(map[string]*container),
|
containers: make(map[string]*container),
|
||||||
config: &runtimeConfig,
|
config: &runtimeConfig,
|
||||||
|
ctx: context.Background(),
|
||||||
}
|
}
|
||||||
|
|
||||||
req := &taskAPI.CreateTaskRequest{
|
req := &taskAPI.CreateTaskRequest{
|
||||||
@@ -184,6 +186,7 @@ func TestCreateSandboxConfigFail(t *testing.T) {
|
|||||||
id: testSandboxID,
|
id: testSandboxID,
|
||||||
containers: make(map[string]*container),
|
containers: make(map[string]*container),
|
||||||
config: &runtimeConfig,
|
config: &runtimeConfig,
|
||||||
|
ctx: context.Background(),
|
||||||
}
|
}
|
||||||
|
|
||||||
req := &taskAPI.CreateTaskRequest{
|
req := &taskAPI.CreateTaskRequest{
|
||||||
@@ -245,6 +248,7 @@ func TestCreateContainerSuccess(t *testing.T) {
|
|||||||
sandbox: sandbox,
|
sandbox: sandbox,
|
||||||
containers: make(map[string]*container),
|
containers: make(map[string]*container),
|
||||||
config: &runtimeConfig,
|
config: &runtimeConfig,
|
||||||
|
ctx: context.Background(),
|
||||||
}
|
}
|
||||||
|
|
||||||
req := &taskAPI.CreateTaskRequest{
|
req := &taskAPI.CreateTaskRequest{
|
||||||
@@ -291,6 +295,7 @@ func TestCreateContainerFail(t *testing.T) {
|
|||||||
id: testContainerID,
|
id: testContainerID,
|
||||||
containers: make(map[string]*container),
|
containers: make(map[string]*container),
|
||||||
config: &runtimeConfig,
|
config: &runtimeConfig,
|
||||||
|
ctx: context.Background(),
|
||||||
}
|
}
|
||||||
|
|
||||||
req := &taskAPI.CreateTaskRequest{
|
req := &taskAPI.CreateTaskRequest{
|
||||||
@@ -351,6 +356,7 @@ func TestCreateContainerConfigFail(t *testing.T) {
|
|||||||
sandbox: sandbox,
|
sandbox: sandbox,
|
||||||
containers: make(map[string]*container),
|
containers: make(map[string]*container),
|
||||||
config: &runtimeConfig,
|
config: &runtimeConfig,
|
||||||
|
ctx: context.Background(),
|
||||||
}
|
}
|
||||||
|
|
||||||
req := &taskAPI.CreateTaskRequest{
|
req := &taskAPI.CreateTaskRequest{
|
||||||
|
@@ -43,6 +43,10 @@ const (
|
|||||||
|
|
||||||
chSize = 128
|
chSize = 128
|
||||||
exitCode255 = 255
|
exitCode255 = 255
|
||||||
|
|
||||||
|
// A time span used to wait for publish a containerd event,
|
||||||
|
// once it costs a longer time than timeOut, it will be canceld.
|
||||||
|
timeOut = 5 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@@ -63,13 +67,16 @@ func New(ctx context.Context, id string, publisher events.Publisher) (cdshim.Shi
|
|||||||
vci.SetLogger(ctx, logger)
|
vci.SetLogger(ctx, logger)
|
||||||
katautils.SetLogger(ctx, logger, logger.Logger.Level)
|
katautils.SetLogger(ctx, logger, logger.Logger.Level)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
|
||||||
s := &service{
|
s := &service{
|
||||||
id: id,
|
id: id,
|
||||||
pid: uint32(os.Getpid()),
|
pid: uint32(os.Getpid()),
|
||||||
context: ctx,
|
ctx: ctx,
|
||||||
containers: make(map[string]*container),
|
containers: make(map[string]*container),
|
||||||
events: make(chan interface{}, chSize),
|
events: make(chan interface{}, chSize),
|
||||||
ec: make(chan exit, bufferSize),
|
ec: make(chan exit, bufferSize),
|
||||||
|
cancel: cancel,
|
||||||
mount: false,
|
mount: false,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -102,12 +109,14 @@ type service struct {
|
|||||||
// will not do the rootfs mount.
|
// will not do the rootfs mount.
|
||||||
mount bool
|
mount bool
|
||||||
|
|
||||||
context context.Context
|
ctx context.Context
|
||||||
sandbox vc.VCSandbox
|
sandbox vc.VCSandbox
|
||||||
containers map[string]*container
|
containers map[string]*container
|
||||||
config *oci.RuntimeConfig
|
config *oci.RuntimeConfig
|
||||||
events chan interface{}
|
events chan interface{}
|
||||||
|
|
||||||
|
cancel func()
|
||||||
|
|
||||||
ec chan exit
|
ec chan exit
|
||||||
id string
|
id string
|
||||||
}
|
}
|
||||||
@@ -209,7 +218,10 @@ func (s *service) StartShim(ctx context.Context, id, containerdBinary, container
|
|||||||
|
|
||||||
func (s *service) forward(publisher events.Publisher) {
|
func (s *service) forward(publisher events.Publisher) {
|
||||||
for e := range s.events {
|
for e := range s.events {
|
||||||
if err := publisher.Publish(s.context, getTopic(s.context, e), e); err != nil {
|
ctx, cancel := context.WithTimeout(s.ctx, timeOut)
|
||||||
|
err := publisher.Publish(ctx, getTopic(e), e)
|
||||||
|
cancel()
|
||||||
|
if err != nil {
|
||||||
logrus.WithError(err).Error("post event")
|
logrus.WithError(err).Error("post event")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -230,7 +242,7 @@ func (s *service) sendL(evt interface{}) {
|
|||||||
s.eventSendMu.Unlock()
|
s.eventSendMu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func getTopic(ctx context.Context, e interface{}) string {
|
func getTopic(e interface{}) string {
|
||||||
switch e.(type) {
|
switch e.(type) {
|
||||||
case *eventstypes.TaskCreate:
|
case *eventstypes.TaskCreate:
|
||||||
return cdruntime.TaskCreateEventTopic
|
return cdruntime.TaskCreateEventTopic
|
||||||
@@ -766,6 +778,8 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (_ *
|
|||||||
}
|
}
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
|
|
||||||
|
s.cancel()
|
||||||
|
|
||||||
os.Exit(0)
|
os.Exit(0)
|
||||||
|
|
||||||
// This will never be called, but this is only there to make sure the
|
// This will never be called, but this is only there to make sure the
|
||||||
|
Reference in New Issue
Block a user