diff --git a/containerd-shim-v2/create.go b/containerd-shim-v2/create.go index c30047fcca..6ddafc36d1 100644 --- a/containerd-shim-v2/create.go +++ b/containerd-shim-v2/create.go @@ -80,7 +80,12 @@ func create(ctx context.Context, s *service, r *taskAPI.CreateTaskRequest, netns rootFs.Mounted = s.mount katautils.HandleFactory(ctx, vci, s.config) - sandbox, _, err := katautils.CreateSandbox(ctx, vci, *ociSpec, *s.config, rootFs, r.ID, bundlePath, "", disableOutput, false, true) + + // Pass service's context instead of local ctx to CreateSandbox(), since local + // ctx will be canceled after this rpc service call, but the sandbox will live + // across multiple rpc service calls. + // + sandbox, _, err := katautils.CreateSandbox(s.ctx, vci, *ociSpec, *s.config, rootFs, r.ID, bundlePath, "", disableOutput, false, true) if err != nil { return nil, err } diff --git a/containerd-shim-v2/create_test.go b/containerd-shim-v2/create_test.go index 53be90d503..eb5b67ad3c 100644 --- a/containerd-shim-v2/create_test.go +++ b/containerd-shim-v2/create_test.go @@ -84,6 +84,7 @@ func TestCreateSandboxSuccess(t *testing.T) { id: testSandboxID, containers: make(map[string]*container), config: &runtimeConfig, + ctx: context.Background(), } req := &taskAPI.CreateTaskRequest{ @@ -129,6 +130,7 @@ func TestCreateSandboxFail(t *testing.T) { id: testSandboxID, containers: make(map[string]*container), config: &runtimeConfig, + ctx: context.Background(), } req := &taskAPI.CreateTaskRequest{ @@ -184,6 +186,7 @@ func TestCreateSandboxConfigFail(t *testing.T) { id: testSandboxID, containers: make(map[string]*container), config: &runtimeConfig, + ctx: context.Background(), } req := &taskAPI.CreateTaskRequest{ @@ -245,6 +248,7 @@ func TestCreateContainerSuccess(t *testing.T) { sandbox: sandbox, containers: make(map[string]*container), config: &runtimeConfig, + ctx: context.Background(), } req := &taskAPI.CreateTaskRequest{ @@ -291,6 +295,7 @@ func TestCreateContainerFail(t *testing.T) { id: testContainerID, containers: make(map[string]*container), config: &runtimeConfig, + ctx: context.Background(), } req := &taskAPI.CreateTaskRequest{ @@ -351,6 +356,7 @@ func TestCreateContainerConfigFail(t *testing.T) { sandbox: sandbox, containers: make(map[string]*container), config: &runtimeConfig, + ctx: context.Background(), } req := &taskAPI.CreateTaskRequest{ diff --git a/containerd-shim-v2/service.go b/containerd-shim-v2/service.go index e8cdc947be..383d7d1238 100644 --- a/containerd-shim-v2/service.go +++ b/containerd-shim-v2/service.go @@ -43,6 +43,10 @@ const ( chSize = 128 exitCode255 = 255 + + // A time span used to wait for publish a containerd event, + // once it costs a longer time than timeOut, it will be canceld. + timeOut = 5 * time.Second ) var ( @@ -63,13 +67,16 @@ func New(ctx context.Context, id string, publisher events.Publisher) (cdshim.Shi vci.SetLogger(ctx, logger) katautils.SetLogger(ctx, logger, logger.Logger.Level) + ctx, cancel := context.WithCancel(ctx) + s := &service{ id: id, pid: uint32(os.Getpid()), - context: ctx, + ctx: ctx, containers: make(map[string]*container), events: make(chan interface{}, chSize), ec: make(chan exit, bufferSize), + cancel: cancel, mount: false, } @@ -102,12 +109,14 @@ type service struct { // will not do the rootfs mount. mount bool - context context.Context + ctx context.Context sandbox vc.VCSandbox containers map[string]*container config *oci.RuntimeConfig events chan interface{} + cancel func() + ec chan exit id string } @@ -209,7 +218,10 @@ func (s *service) StartShim(ctx context.Context, id, containerdBinary, container func (s *service) forward(publisher events.Publisher) { for e := range s.events { - if err := publisher.Publish(s.context, getTopic(s.context, e), e); err != nil { + ctx, cancel := context.WithTimeout(s.ctx, timeOut) + err := publisher.Publish(ctx, getTopic(e), e) + cancel() + if err != nil { logrus.WithError(err).Error("post event") } } @@ -230,7 +242,7 @@ func (s *service) sendL(evt interface{}) { s.eventSendMu.Unlock() } -func getTopic(ctx context.Context, e interface{}) string { +func getTopic(e interface{}) string { switch e.(type) { case *eventstypes.TaskCreate: return cdruntime.TaskCreateEventTopic @@ -766,6 +778,8 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (_ * } s.mu.Unlock() + s.cancel() + os.Exit(0) // This will never be called, but this is only there to make sure the