Files
kata-containers/virtcontainers/monitor.go
Ace-Tang 88e281cb14 monitor: enlarge watch buffer
enlarge watch buffer, or no one consume monitor watcher if shim.Wait and shim.watchSandbox
simultaneously receive signal

goroutine 60 [semacquire, 641 minutes]:
sync.runtime_SemacquireMutex(0xc00037a144, 0x42cd00)
/usr/local/go/src/runtime/sema.go:71 +0x3d
sync.(*Mutex).Lock(0xc00037a140)
/usr/local/go/src/sync/mutex.go:134 +0x109
github.com/kata-containers/runtime/virtcontainers.(*monitor).stop(0xc00037a140)
/go/src/github.com/kata-containers/runtime/virtcontainers/monitor.go:95 +0x5f
github.com/kata-containers/runtime/virtcontainers.(*Sandbox).Delete(0xc0003c8160, 0x78effdc01, 0x0)
/go/src/github.com/kata-containers/runtime/virtcontainers/sandbox.go:773 +0x4fb
github.com/kata-containers/runtime/containerd-shim-v2.wait(0xc000478b80, 0xc000338240, 0x0, 0x0, 0x107d540, 0xc0000100f0, 0x107d520)
/go/src/github.com/kata-containers/runtime/containerd-shim-v2/wait.go:60 +0x3e5
created by github.com/kata-containers/runtime/containerd-shim-v2.startContainer
/go/src/github.com/kata-containers/runtime/containerd-shim-v2/start.go:74 +0x3e5

goroutine 53 [chan send, 641 minutes]:
github.com/kata-containers/runtime/virtcontainers.(*monitor).notify(0xc00037a140, 0x107cfe0, 0xc0001ec160)
/go/src/github.com/kata-containers/runtime/virtcontainers/monitor.go:87 +0xed
github.com/kata-containers/runtime/virtcontainers.(*monitor).watchAgent(0xc00037a140)
/go/src/github.com/kata-containers/runtime/virtcontainers/monitor.go:125 +0xab
github.com/kata-containers/runtime/virtcontainers.(*monitor).newWatcher.func1(0xc00037a140)
/go/src/github.com/kata-containers/runtime/virtcontainers/monitor.go:59 +0x72
created by github.com/kata-containers/runtime/virtcontainers.(*monitor).newWatcher
/go/src/github.com/kata-containers/runtime/virtcontainers/monitor.go:49 +0x125

Fixes: #1981

Signed-off-by: Ace-Tang <aceapril@126.com>
2019-08-21 11:35:48 +08:00

145 lines
2.5 KiB
Go

// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
package virtcontainers
import (
"sync"
"time"
"github.com/pkg/errors"
)
const (
defaultCheckInterval = 1 * time.Second
watcherChannelSize = 128
)
type monitor struct {
sync.Mutex
sandbox *Sandbox
checkInterval time.Duration
watchers []chan error
wg sync.WaitGroup
running bool
stopCh chan bool
}
func newMonitor(s *Sandbox) *monitor {
return &monitor{
sandbox: s,
checkInterval: defaultCheckInterval,
stopCh: make(chan bool, 1),
}
}
func (m *monitor) newWatcher() (chan error, error) {
m.Lock()
defer m.Unlock()
watcher := make(chan error, watcherChannelSize)
m.watchers = append(m.watchers, watcher)
if !m.running {
m.running = true
m.wg.Add(1)
// create and start agent watcher
go func() {
tick := time.NewTicker(m.checkInterval)
for {
select {
case <-m.stopCh:
tick.Stop()
m.wg.Done()
return
case <-tick.C:
m.watchHypervisor()
m.watchAgent()
}
}
}()
}
return watcher, nil
}
func (m *monitor) notify(err error) {
m.sandbox.agent.markDead()
m.Lock()
defer m.Unlock()
if !m.running {
return
}
// a watcher is not supposed to close the channel
// but just in case...
defer func() {
if x := recover(); x != nil {
virtLog.Warnf("watcher closed channel: %v", x)
}
}()
for _, c := range m.watchers {
// throw away message can not write to channel
// make it not stuck, the first error is useful.
select {
case c <- err:
default:
virtLog.WithField("channel-size", watcherChannelSize).Warnf("watcher channel is full, throw notify message")
}
}
}
func (m *monitor) stop() {
// wait outside of monitor lock for the watcher channel to exit.
defer m.wg.Wait()
m.Lock()
defer m.Unlock()
if !m.running {
return
}
m.stopCh <- true
defer func() {
m.watchers = nil
m.running = false
}()
// a watcher is not supposed to close the channel
// but just in case...
defer func() {
if x := recover(); x != nil {
virtLog.Warnf("watcher closed channel: %v", x)
}
}()
for _, c := range m.watchers {
close(c)
}
}
func (m *monitor) watchAgent() {
err := m.sandbox.agent.check()
if err != nil {
// TODO: define and export error types
m.notify(errors.Wrapf(err, "failed to ping agent"))
}
}
func (m *monitor) watchHypervisor() error {
if err := m.sandbox.hypervisor.check(); err != nil {
m.notify(errors.Wrapf(err, "failed to ping hypervisor process"))
return err
}
return nil
}