mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-06-30 09:13:29 +00:00
shimv2: Improve shim shutdown logic
Latest shimv2 publishes an event to containerd used ttrpc instead of using containerd binary, thus shimv2 shouldn't call `os.Exit` to terminate the shim's life, but close the context on shutdown so that events and other resources have hit the `defer`s. Fixes:#1731 Signed-off-by: lifupan <lifupan@gmail.com>
This commit is contained in:
parent
590ed09bfa
commit
eabfd99734
@ -16,7 +16,6 @@ import (
|
|||||||
|
|
||||||
eventstypes "github.com/containerd/containerd/api/events"
|
eventstypes "github.com/containerd/containerd/api/events"
|
||||||
"github.com/containerd/containerd/errdefs"
|
"github.com/containerd/containerd/errdefs"
|
||||||
"github.com/containerd/containerd/events"
|
|
||||||
"github.com/containerd/containerd/namespaces"
|
"github.com/containerd/containerd/namespaces"
|
||||||
cdruntime "github.com/containerd/containerd/runtime"
|
cdruntime "github.com/containerd/containerd/runtime"
|
||||||
cdshim "github.com/containerd/containerd/runtime/v2/shim"
|
cdshim "github.com/containerd/containerd/runtime/v2/shim"
|
||||||
@ -58,7 +57,7 @@ var (
|
|||||||
var vci vc.VC = &vc.VCImpl{}
|
var vci vc.VC = &vc.VCImpl{}
|
||||||
|
|
||||||
// New returns a new shim service that can be used via GRPC
|
// New returns a new shim service that can be used via GRPC
|
||||||
func New(ctx context.Context, id string, publisher events.Publisher) (cdshim.Shim, error) {
|
func New(ctx context.Context, id string, publisher cdshim.Publisher, shutdown func()) (cdshim.Shim, error) {
|
||||||
logger := logrus.WithField("ID", id)
|
logger := logrus.WithField("ID", id)
|
||||||
// Discard the log before shim init its log output. Otherwise
|
// Discard the log before shim init its log output. Otherwise
|
||||||
// it will output into stdio, from which containerd would like
|
// it will output into stdio, from which containerd would like
|
||||||
@ -67,8 +66,6 @@ func New(ctx context.Context, id string, publisher events.Publisher) (cdshim.Shi
|
|||||||
vci.SetLogger(ctx, logger)
|
vci.SetLogger(ctx, logger)
|
||||||
katautils.SetLogger(ctx, logger, logger.Logger.Level)
|
katautils.SetLogger(ctx, logger, logger.Logger.Level)
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
|
||||||
|
|
||||||
s := &service{
|
s := &service{
|
||||||
id: id,
|
id: id,
|
||||||
pid: uint32(os.Getpid()),
|
pid: uint32(os.Getpid()),
|
||||||
@ -76,13 +73,13 @@ func New(ctx context.Context, id string, publisher events.Publisher) (cdshim.Shi
|
|||||||
containers: make(map[string]*container),
|
containers: make(map[string]*container),
|
||||||
events: make(chan interface{}, chSize),
|
events: make(chan interface{}, chSize),
|
||||||
ec: make(chan exit, bufferSize),
|
ec: make(chan exit, bufferSize),
|
||||||
cancel: cancel,
|
cancel: shutdown,
|
||||||
mount: false,
|
mount: false,
|
||||||
}
|
}
|
||||||
|
|
||||||
go s.processExits()
|
go s.processExits()
|
||||||
|
|
||||||
go s.forward(publisher)
|
go s.forward(ctx, publisher)
|
||||||
|
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
@ -216,15 +213,20 @@ func (s *service) StartShim(ctx context.Context, id, containerdBinary, container
|
|||||||
return address, nil
|
return address, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *service) forward(publisher events.Publisher) {
|
func (s *service) forward(ctx context.Context, publisher cdshim.Publisher) {
|
||||||
|
ns, _ := namespaces.Namespace(ctx)
|
||||||
|
ctx = namespaces.WithNamespace(context.Background(), ns)
|
||||||
|
|
||||||
for e := range s.events {
|
for e := range s.events {
|
||||||
ctx, cancel := context.WithTimeout(s.ctx, timeOut)
|
ctx, cancel := context.WithTimeout(ctx, timeOut)
|
||||||
err := publisher.Publish(ctx, getTopic(e), e)
|
err := publisher.Publish(ctx, getTopic(e), e)
|
||||||
cancel()
|
cancel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Error("post event")
|
logrus.WithError(err).Error("post event")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
publisher.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *service) send(evt interface{}) {
|
func (s *service) send(evt interface{}) {
|
||||||
@ -769,11 +771,8 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (_ *
|
|||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
|
|
||||||
s.cancel()
|
s.cancel()
|
||||||
|
close(s.events)
|
||||||
|
|
||||||
os.Exit(0)
|
|
||||||
|
|
||||||
// This will never be called, but this is only there to make sure the
|
|
||||||
// program can compile.
|
|
||||||
return empty, nil
|
return empty, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user