api: add sandbox Monitor API

It monitors the sandbox status and returns an error channel to let
caller watch it.

Fixes: #251

Signed-off-by: Peng Tao <bergwolf@gmail.com>
This commit is contained in:
Peng Tao 2018-04-20 22:20:34 +08:00
parent 70b3c774f8
commit 35ebadcedc
11 changed files with 267 additions and 0 deletions

View File

@ -133,6 +133,9 @@ type agent interface {
// supported by the agent.
capabilities() capabilities
// check will check the agent liveness
check() error
// disconnect will disconnect the connection to the agent
disconnect() error

View File

@ -801,3 +801,8 @@ func (h *hyper) onlineCPUMem(cpus uint32) error {
// cc-agent uses udev to online CPUs automatically
return nil
}
func (h *hyper) check() error {
// cc-agent does not support check
return nil
}

View File

@ -49,6 +49,7 @@ type VCSandbox interface {
Pause() error
Resume() error
Release() error
Monitor() (chan error, error)
Delete() error
Status() SandboxStatus
CreateContainer(contConfig ContainerConfig) (VCContainer, error)

View File

@ -15,6 +15,7 @@ import (
"strconv"
"strings"
"syscall"
"time"
kataclient "github.com/kata-containers/agent/protocols/client"
"github.com/kata-containers/agent/protocols/grpc"
@ -934,6 +935,11 @@ func (k *kataAgent) disconnect() error {
return nil
}
func (k *kataAgent) check() error {
_, err := k.sendReq(&grpc.CheckRequest{})
return err
}
func (k *kataAgent) sendReq(request interface{}) (interface{}, error) {
if err := k.connect(); err != nil {
return nil, err
@ -943,6 +949,11 @@ func (k *kataAgent) sendReq(request interface{}) (interface{}, error) {
}
switch req := request.(type) {
case *grpc.CheckRequest:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err := k.client.Check(ctx, req)
return nil, err
case *grpc.ExecProcessRequest:
_, err := k.client.ExecProcess(context.Background(), req)
return nil, err

View File

@ -191,10 +191,20 @@ func (p *gRPCProxy) OnlineCPUMem(ctx context.Context, req *pb.OnlineCPUMemReques
return emptyResp, nil
}
func (p *gRPCProxy) Check(ctx context.Context, req *pb.CheckRequest) (*pb.HealthCheckResponse, error) {
return &pb.HealthCheckResponse{}, nil
}
func (p *gRPCProxy) Version(ctx context.Context, req *pb.CheckRequest) (*pb.VersionCheckResponse, error) {
return &pb.VersionCheckResponse{}, nil
}
func gRPCRegister(s *grpc.Server, srv interface{}) {
switch g := srv.(type) {
case *gRPCProxy:
pb.RegisterAgentServiceServer(s, g)
pb.RegisterHealthServer(s, g)
}
}
@ -206,6 +216,7 @@ var reqList = []interface{}{
&pb.StartContainerRequest{},
&pb.RemoveContainerRequest{},
&pb.SignalProcessRequest{},
&pb.CheckRequest{},
}
func TestKataAgentSendReq(t *testing.T) {

120
virtcontainers/monitor.go Normal file
View File

@ -0,0 +1,120 @@
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
package virtcontainers
import (
"sync"
"time"
)
const defaultCheckInterval = 10 * time.Second
type monitor struct {
sync.Mutex
sandbox *Sandbox
checkInterval time.Duration
watchers []chan error
running bool
stopCh chan bool
wg sync.WaitGroup
}
func newMonitor(s *Sandbox) *monitor {
return &monitor{
sandbox: s,
checkInterval: defaultCheckInterval,
stopCh: make(chan bool, 1),
}
}
func (m *monitor) newWatcher() (chan error, error) {
m.Lock()
defer m.Unlock()
watcher := make(chan error, 1)
m.watchers = append(m.watchers, watcher)
if !m.running {
m.running = true
m.wg.Add(1)
// create and start agent watcher
go func() {
tick := time.NewTicker(m.checkInterval)
for {
select {
case <-m.stopCh:
tick.Stop()
m.wg.Done()
return
case <-tick.C:
m.watchAgent()
}
}
}()
}
return watcher, nil
}
func (m *monitor) notify(err error) {
m.Lock()
defer m.Unlock()
if !m.running {
return
}
// a watcher is not supposed to close the channel
// but just in case...
defer func() {
if x := recover(); x != nil {
virtLog.Warnf("watcher closed channel: %v", x)
}
}()
for _, c := range m.watchers {
c <- err
}
}
func (m *monitor) stop() {
// wait outside of monitor lock for the watcher channel to exit.
defer m.wg.Wait()
m.Lock()
defer m.Unlock()
if !m.running {
return
}
defer func() {
m.stopCh <- true
m.watchers = nil
m.running = false
}()
// a watcher is not supposed to close the channel
// but just in case...
defer func() {
if x := recover(); x != nil {
virtLog.Warnf("watcher closed channel: %v", x)
}
}()
for _, c := range m.watchers {
close(c)
}
}
func (m *monitor) watchAgent() {
err := m.sandbox.agent.check()
if err != nil {
m.notify(err)
}
}

View File

@ -0,0 +1,62 @@
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
package virtcontainers
import (
"errors"
"testing"
"github.com/stretchr/testify/assert"
)
func TestMonitorSuccess(t *testing.T) {
contID := "505"
contConfig := newTestContainerConfigNoop(contID)
hConfig := newHypervisorConfig(nil, nil)
// create a sandbox
s, err := testCreateSandbox(t, testSandboxID, MockHypervisor, hConfig, NoopAgentType, NoopNetworkModel, NetworkConfig{}, []ContainerConfig{contConfig}, nil)
if err != nil {
t.Fatal(err)
}
defer cleanUp()
m := newMonitor(s)
ch, err := m.newWatcher()
assert.Nil(t, err, "newWatcher failed: %v", err)
fakeErr := errors.New("foobar error")
m.notify(fakeErr)
resultErr := <-ch
assert.True(t, resultErr == fakeErr, "monitor notification mismatch %v vs. %v", resultErr, fakeErr)
m.stop()
}
func TestMonitorClosedChannel(t *testing.T) {
contID := "505"
contConfig := newTestContainerConfigNoop(contID)
hConfig := newHypervisorConfig(nil, nil)
// create a sandbox
s, err := testCreateSandbox(t, testSandboxID, MockHypervisor, hConfig, NoopAgentType, NoopNetworkModel, NetworkConfig{}, []ContainerConfig{contConfig}, nil)
if err != nil {
t.Fatal(err)
}
defer cleanUp()
m := newMonitor(s)
ch, err := m.newWatcher()
assert.Nil(t, err, "newWatcher failed: %v", err)
close(ch)
fakeErr := errors.New("foobar error")
m.notify(fakeErr)
m.stop()
}

View File

@ -78,3 +78,8 @@ func (n *noopAgent) processListContainer(sandbox *Sandbox, c Container, options
func (n *noopAgent) onlineCPUMem(cpus uint32) error {
return nil
}
// check is the Noop agent health checker. It does nothing.
func (n *noopAgent) check() error {
return nil
}

View File

@ -99,3 +99,8 @@ func (p *Sandbox) Status() vc.SandboxStatus {
func (p *Sandbox) EnterContainer(containerID string, cmd vc.Cmd) (vc.VCContainer, *vc.Process, error) {
return &Container{}, &vc.Process{}, nil
}
// Monitor implements the VCSandbox function of the same name.
func (p *Sandbox) Monitor() (chan error, error) {
return nil, nil
}

View File

@ -437,10 +437,12 @@ func unlockSandbox(lockFile *os.File) error {
type Sandbox struct {
id string
sync.Mutex
hypervisor hypervisor
agent agent
storage resourceStorage
network network
monitor *monitor
config *SandboxConfig
@ -532,6 +534,9 @@ func (s *Sandbox) GetContainer(containerID string) VCContainer {
// Release closes the agent connection and removes sandbox from internal list.
func (s *Sandbox) Release() error {
globalSandboxList.removeSandbox(s.id)
if s.monitor != nil {
s.monitor.stop()
}
return s.agent.disconnect()
}
@ -561,6 +566,21 @@ func (s *Sandbox) Status() SandboxStatus {
}
}
// Monitor returns a error channel for watcher to watch at
func (s *Sandbox) Monitor() (chan error, error) {
if s.state.State != StateRunning {
return nil, fmt.Errorf("Sandbox is not running")
}
s.Lock()
if s.monitor == nil {
s.monitor = newMonitor(s)
}
s.Unlock()
return s.monitor.newWatcher()
}
func createAssets(sandboxConfig *SandboxConfig) error {
kernel, err := newAsset(sandboxConfig, kernelAsset)
if err != nil {
@ -808,6 +828,10 @@ func (s *Sandbox) Delete() error {
globalSandboxList.removeSandbox(s.id)
if s.monitor != nil {
s.monitor.stop()
}
return s.storage.deleteSandboxResources(s.id, nil)
}

View File

@ -1388,3 +1388,23 @@ func TestEnterContainer(t *testing.T) {
_, _, err = s.EnterContainer(contID, cmd)
assert.Nil(t, err, "Enter container failed: %v", err)
}
func TestMonitor(t *testing.T) {
s, err := testCreateSandbox(t, testSandboxID, MockHypervisor, newHypervisorConfig(nil, nil), NoopAgentType, NoopNetworkModel, NetworkConfig{}, nil, nil)
assert.Nil(t, err, "VirtContainers should not allow empty sandboxes")
defer cleanUp()
_, err = s.Monitor()
assert.NotNil(t, err, "Monitoring non-running container should fail")
err = s.start()
assert.Nil(t, err, "Failed to start sandbox: %v", err)
_, err = s.Monitor()
assert.Nil(t, err, "Monitor sandbox failed: %v", err)
_, err = s.Monitor()
assert.Nil(t, err, "Monitor sandbox again failed: %v", err)
s.monitor.stop()
}