mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-07-16 16:32:03 +00:00
Merge pull request #3997 from liubin/backport-2.4
stable-2.4 | runtime: Stop getting OOM events from agent for "ttrpc closed" error
This commit is contained in:
commit
6abbcc551c
@ -15,7 +15,6 @@ import (
|
|||||||
"github.com/containerd/containerd/api/types/task"
|
"github.com/containerd/containerd/api/types/task"
|
||||||
"github.com/containerd/containerd/mount"
|
"github.com/containerd/containerd/mount"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"google.golang.org/grpc/codes"
|
|
||||||
|
|
||||||
"github.com/kata-containers/kata-containers/src/runtime/pkg/oci"
|
"github.com/kata-containers/kata-containers/src/runtime/pkg/oci"
|
||||||
)
|
)
|
||||||
@ -68,6 +67,7 @@ func wait(ctx context.Context, s *service, c *container, execID string) (int32,
|
|||||||
if c.cType.IsSandbox() {
|
if c.cType.IsSandbox() {
|
||||||
// cancel watcher
|
// cancel watcher
|
||||||
if s.monitor != nil {
|
if s.monitor != nil {
|
||||||
|
shimLog.WithField("sandbox", s.sandbox.ID()).Info("cancel watcher")
|
||||||
s.monitor <- nil
|
s.monitor <- nil
|
||||||
}
|
}
|
||||||
if err = s.sandbox.Stop(ctx, true); err != nil {
|
if err = s.sandbox.Stop(ctx, true); err != nil {
|
||||||
@ -111,6 +111,7 @@ func watchSandbox(ctx context.Context, s *service) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
err := <-s.monitor
|
err := <-s.monitor
|
||||||
|
shimLog.WithError(err).WithField("sandbox", s.sandbox.ID()).Info("watchSandbox gets an error or stop signal")
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -156,13 +157,11 @@ func watchOOMEvents(ctx context.Context, s *service) {
|
|||||||
default:
|
default:
|
||||||
containerID, err := s.sandbox.GetOOMEvent(ctx)
|
containerID, err := s.sandbox.GetOOMEvent(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
shimLog.WithError(err).Warn("failed to get OOM event from sandbox")
|
if err.Error() == "ttrpc: closed" || err.Error() == "Dead agent" {
|
||||||
// If the GetOOMEvent call is not implemented, then the agent is most likely an older version,
|
shimLog.WithError(err).Warn("agent has shutdown, return from watching of OOM events")
|
||||||
// stop attempting to get OOM events.
|
|
||||||
// for rust agent, the response code is not found
|
|
||||||
if isGRPCErrorCode(codes.NotFound, err) || err.Error() == "Dead agent" {
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
shimLog.WithError(err).Warn("failed to get OOM event from sandbox")
|
||||||
time.Sleep(defaultCheckInterval)
|
time.Sleep(defaultCheckInterval)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -18,6 +18,8 @@ const (
|
|||||||
watcherChannelSize = 128
|
watcherChannelSize = 128
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var monitorLog = virtLog.WithField("subsystem", "virtcontainers/monitor")
|
||||||
|
|
||||||
// nolint: govet
|
// nolint: govet
|
||||||
type monitor struct {
|
type monitor struct {
|
||||||
watchers []chan error
|
watchers []chan error
|
||||||
@ -33,6 +35,9 @@ type monitor struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newMonitor(s *Sandbox) *monitor {
|
func newMonitor(s *Sandbox) *monitor {
|
||||||
|
// there should only be one monitor for one sandbox,
|
||||||
|
// so it's safe to let monitorLog as a global variable.
|
||||||
|
monitorLog = monitorLog.WithField("sandbox", s.ID())
|
||||||
return &monitor{
|
return &monitor{
|
||||||
sandbox: s,
|
sandbox: s,
|
||||||
checkInterval: defaultCheckInterval,
|
checkInterval: defaultCheckInterval,
|
||||||
@ -72,6 +77,7 @@ func (m *monitor) newWatcher(ctx context.Context) (chan error, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *monitor) notify(ctx context.Context, err error) {
|
func (m *monitor) notify(ctx context.Context, err error) {
|
||||||
|
monitorLog.WithError(err).Warn("notify on errors")
|
||||||
m.sandbox.agent.markDead(ctx)
|
m.sandbox.agent.markDead(ctx)
|
||||||
|
|
||||||
m.Lock()
|
m.Lock()
|
||||||
@ -85,18 +91,19 @@ func (m *monitor) notify(ctx context.Context, err error) {
|
|||||||
// but just in case...
|
// but just in case...
|
||||||
defer func() {
|
defer func() {
|
||||||
if x := recover(); x != nil {
|
if x := recover(); x != nil {
|
||||||
virtLog.Warnf("watcher closed channel: %v", x)
|
monitorLog.Warnf("watcher closed channel: %v", x)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
for _, c := range m.watchers {
|
for _, c := range m.watchers {
|
||||||
|
monitorLog.WithError(err).Warn("write error to watcher")
|
||||||
// throw away message can not write to channel
|
// throw away message can not write to channel
|
||||||
// make it not stuck, the first error is useful.
|
// make it not stuck, the first error is useful.
|
||||||
select {
|
select {
|
||||||
case c <- err:
|
case c <- err:
|
||||||
|
|
||||||
default:
|
default:
|
||||||
virtLog.WithField("channel-size", watcherChannelSize).Warnf("watcher channel is full, throw notify message")
|
monitorLog.WithField("channel-size", watcherChannelSize).Warnf("watcher channel is full, throw notify message")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -104,6 +111,7 @@ func (m *monitor) notify(ctx context.Context, err error) {
|
|||||||
func (m *monitor) stop() {
|
func (m *monitor) stop() {
|
||||||
// wait outside of monitor lock for the watcher channel to exit.
|
// wait outside of monitor lock for the watcher channel to exit.
|
||||||
defer m.wg.Wait()
|
defer m.wg.Wait()
|
||||||
|
monitorLog.Info("stopping monitor")
|
||||||
|
|
||||||
m.Lock()
|
m.Lock()
|
||||||
defer m.Unlock()
|
defer m.Unlock()
|
||||||
@ -122,7 +130,7 @@ func (m *monitor) stop() {
|
|||||||
// but just in case...
|
// but just in case...
|
||||||
defer func() {
|
defer func() {
|
||||||
if x := recover(); x != nil {
|
if x := recover(); x != nil {
|
||||||
virtLog.Warnf("watcher closed channel: %v", x)
|
monitorLog.Warnf("watcher closed channel: %v", x)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user