shimv2: Improve shim shutdown logic

Latest shimv2 publishes an event to containerd used ttrpc instead
of using containerd binary, thus shimv2 shouldn't call `os.Exit` to
terminate the shim's life,  but close the context on shutdown so that
events and other resources have hit the `defer`s.

Fixes:#1731

Signed-off-by: lifupan <lifupan@gmail.com>
This commit is contained in:
lifupan 2019-05-10 05:39:49 -04:00
parent 590ed09bfa
commit eabfd99734

View File

@ -16,7 +16,6 @@ import (
eventstypes "github.com/containerd/containerd/api/events" eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/namespaces"
cdruntime "github.com/containerd/containerd/runtime" cdruntime "github.com/containerd/containerd/runtime"
cdshim "github.com/containerd/containerd/runtime/v2/shim" cdshim "github.com/containerd/containerd/runtime/v2/shim"
@ -58,7 +57,7 @@ var (
var vci vc.VC = &vc.VCImpl{} var vci vc.VC = &vc.VCImpl{}
// New returns a new shim service that can be used via GRPC // New returns a new shim service that can be used via GRPC
func New(ctx context.Context, id string, publisher events.Publisher) (cdshim.Shim, error) { func New(ctx context.Context, id string, publisher cdshim.Publisher, shutdown func()) (cdshim.Shim, error) {
logger := logrus.WithField("ID", id) logger := logrus.WithField("ID", id)
// Discard the log before shim init its log output. Otherwise // Discard the log before shim init its log output. Otherwise
// it will output into stdio, from which containerd would like // it will output into stdio, from which containerd would like
@ -67,8 +66,6 @@ func New(ctx context.Context, id string, publisher events.Publisher) (cdshim.Shi
vci.SetLogger(ctx, logger) vci.SetLogger(ctx, logger)
katautils.SetLogger(ctx, logger, logger.Logger.Level) katautils.SetLogger(ctx, logger, logger.Logger.Level)
ctx, cancel := context.WithCancel(ctx)
s := &service{ s := &service{
id: id, id: id,
pid: uint32(os.Getpid()), pid: uint32(os.Getpid()),
@ -76,13 +73,13 @@ func New(ctx context.Context, id string, publisher events.Publisher) (cdshim.Shi
containers: make(map[string]*container), containers: make(map[string]*container),
events: make(chan interface{}, chSize), events: make(chan interface{}, chSize),
ec: make(chan exit, bufferSize), ec: make(chan exit, bufferSize),
cancel: cancel, cancel: shutdown,
mount: false, mount: false,
} }
go s.processExits() go s.processExits()
go s.forward(publisher) go s.forward(ctx, publisher)
return s, nil return s, nil
} }
@ -216,15 +213,20 @@ func (s *service) StartShim(ctx context.Context, id, containerdBinary, container
return address, nil return address, nil
} }
func (s *service) forward(publisher events.Publisher) { func (s *service) forward(ctx context.Context, publisher cdshim.Publisher) {
ns, _ := namespaces.Namespace(ctx)
ctx = namespaces.WithNamespace(context.Background(), ns)
for e := range s.events { for e := range s.events {
ctx, cancel := context.WithTimeout(s.ctx, timeOut) ctx, cancel := context.WithTimeout(ctx, timeOut)
err := publisher.Publish(ctx, getTopic(e), e) err := publisher.Publish(ctx, getTopic(e), e)
cancel() cancel()
if err != nil { if err != nil {
logrus.WithError(err).Error("post event") logrus.WithError(err).Error("post event")
} }
} }
publisher.Close()
} }
func (s *service) send(evt interface{}) { func (s *service) send(evt interface{}) {
@ -769,11 +771,8 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (_ *
s.mu.Unlock() s.mu.Unlock()
s.cancel() s.cancel()
close(s.events)
os.Exit(0)
// This will never be called, but this is only there to make sure the
// program can compile.
return empty, nil return empty, nil
} }