Merge pull request #252 from bergwolf/sandbox_api_1

API: support sandbox monitor operation
This commit is contained in:
Sebastien Boeuf 2018-05-01 10:01:17 -07:00 committed by GitHub
commit 87aa1d77ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 317 additions and 34 deletions

View File

@ -133,6 +133,9 @@ type agent interface {
// supported by the agent. // supported by the agent.
capabilities() capabilities capabilities() capabilities
// check will check the agent liveness
check() error
// disconnect will disconnect the connection to the agent // disconnect will disconnect the connection to the agent
disconnect() error disconnect() error

View File

@ -801,3 +801,8 @@ func (h *hyper) onlineCPUMem(cpus uint32) error {
// cc-agent uses udev to online CPUs automatically // cc-agent uses udev to online CPUs automatically
return nil return nil
} }
func (h *hyper) check() error {
// cc-agent does not support check
return nil
}

View File

@ -49,6 +49,7 @@ type VCSandbox interface {
Pause() error Pause() error
Resume() error Resume() error
Release() error Release() error
Monitor() (chan error, error)
Delete() error Delete() error
Status() SandboxStatus Status() SandboxStatus
CreateContainer(contConfig ContainerConfig) (VCContainer, error) CreateContainer(contConfig ContainerConfig) (VCContainer, error)

View File

@ -15,7 +15,9 @@ import (
"strconv" "strconv"
"strings" "strings"
"syscall" "syscall"
"time"
"github.com/gogo/protobuf/proto"
kataclient "github.com/kata-containers/agent/protocols/client" kataclient "github.com/kata-containers/agent/protocols/client"
"github.com/kata-containers/agent/protocols/grpc" "github.com/kata-containers/agent/protocols/grpc"
vcAnnotations "github.com/kata-containers/runtime/virtcontainers/pkg/annotations" vcAnnotations "github.com/kata-containers/runtime/virtcontainers/pkg/annotations"
@ -73,6 +75,7 @@ type kataAgent struct {
shim shim shim shim
proxy proxy proxy proxy
client *kataclient.AgentClient client *kataclient.AgentClient
reqHandlers map[string]reqFunc
state KataAgentState state KataAgentState
keepConn bool keepConn bool
proxyBuiltIn bool proxyBuiltIn bool
@ -916,6 +919,7 @@ func (k *kataAgent) connect() error {
return err return err
} }
k.installReqFunc(client)
k.client = client k.client = client
return nil return nil
@ -929,11 +933,62 @@ func (k *kataAgent) disconnect() error {
if err := k.client.Close(); err != nil && err != golangGrpc.ErrClientConnClosing { if err := k.client.Close(); err != nil && err != golangGrpc.ErrClientConnClosing {
return err return err
} }
k.client = nil k.client = nil
k.reqHandlers = nil
return nil return nil
} }
func (k *kataAgent) check() error {
_, err := k.sendReq(&grpc.CheckRequest{})
return err
}
type reqFunc func(context.Context, interface{}, ...golangGrpc.CallOption) (interface{}, error)
func (k *kataAgent) installReqFunc(c *kataclient.AgentClient) {
k.reqHandlers = make(map[string]reqFunc)
k.reqHandlers["grpc.CheckRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
return k.client.Check(ctx, req.(*grpc.CheckRequest), opts...)
}
k.reqHandlers["grpc.ExecProcessRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.ExecProcess(ctx, req.(*grpc.ExecProcessRequest), opts...)
}
k.reqHandlers["grpc.CreateSandboxRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.CreateSandbox(ctx, req.(*grpc.CreateSandboxRequest), opts...)
}
k.reqHandlers["grpc.DestroySandboxRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.DestroySandbox(ctx, req.(*grpc.DestroySandboxRequest), opts...)
}
k.reqHandlers["grpc.CreateContainerRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.CreateContainer(ctx, req.(*grpc.CreateContainerRequest), opts...)
}
k.reqHandlers["grpc.StartContainerRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.StartContainer(ctx, req.(*grpc.StartContainerRequest), opts...)
}
k.reqHandlers["grpc.RemoveContainerRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.RemoveContainer(ctx, req.(*grpc.RemoveContainerRequest), opts...)
}
k.reqHandlers["grpc.SignalProcessRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.SignalProcess(ctx, req.(*grpc.SignalProcessRequest), opts...)
}
k.reqHandlers["grpc.UpdateRoutesRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.UpdateRoutes(ctx, req.(*grpc.UpdateRoutesRequest), opts...)
}
k.reqHandlers["grpc.UpdateInterfaceRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.UpdateInterface(ctx, req.(*grpc.UpdateInterfaceRequest), opts...)
}
k.reqHandlers["grpc.OnlineCPUMemRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.OnlineCPUMem(ctx, req.(*grpc.OnlineCPUMemRequest), opts...)
}
k.reqHandlers["grpc.ListProcessesRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.ListProcesses(ctx, req.(*grpc.ListProcessesRequest), opts...)
}
}
func (k *kataAgent) sendReq(request interface{}) (interface{}, error) { func (k *kataAgent) sendReq(request interface{}) (interface{}, error) {
if err := k.connect(); err != nil { if err := k.connect(); err != nil {
return nil, err return nil, err
@ -942,39 +997,11 @@ func (k *kataAgent) sendReq(request interface{}) (interface{}, error) {
defer k.disconnect() defer k.disconnect()
} }
switch req := request.(type) { msgName := proto.MessageName(request.(proto.Message))
case *grpc.ExecProcessRequest: handler := k.reqHandlers[msgName]
_, err := k.client.ExecProcess(context.Background(), req) if msgName == "" || handler == nil {
return nil, err return nil, fmt.Errorf("Invalid request type")
case *grpc.CreateSandboxRequest:
_, err := k.client.CreateSandbox(context.Background(), req)
return nil, err
case *grpc.DestroySandboxRequest:
_, err := k.client.DestroySandbox(context.Background(), req)
return nil, err
case *grpc.CreateContainerRequest:
_, err := k.client.CreateContainer(context.Background(), req)
return nil, err
case *grpc.StartContainerRequest:
_, err := k.client.StartContainer(context.Background(), req)
return nil, err
case *grpc.RemoveContainerRequest:
_, err := k.client.RemoveContainer(context.Background(), req)
return nil, err
case *grpc.SignalProcessRequest:
_, err := k.client.SignalProcess(context.Background(), req)
return nil, err
case *grpc.UpdateRoutesRequest:
_, err := k.client.UpdateRoutes(context.Background(), req)
return nil, err
case *grpc.UpdateInterfaceRequest:
ifc, err := k.client.UpdateInterface(context.Background(), req)
return ifc, err
case *grpc.OnlineCPUMemRequest:
return k.client.OnlineCPUMem(context.Background(), req)
case *grpc.ListProcessesRequest:
return k.client.ListProcesses(context.Background(), req)
default:
return nil, fmt.Errorf("Unknown gRPC type %T", req)
} }
return handler(context.Background(), request)
} }

View File

@ -191,10 +191,20 @@ func (p *gRPCProxy) OnlineCPUMem(ctx context.Context, req *pb.OnlineCPUMemReques
return emptyResp, nil return emptyResp, nil
} }
func (p *gRPCProxy) Check(ctx context.Context, req *pb.CheckRequest) (*pb.HealthCheckResponse, error) {
return &pb.HealthCheckResponse{}, nil
}
func (p *gRPCProxy) Version(ctx context.Context, req *pb.CheckRequest) (*pb.VersionCheckResponse, error) {
return &pb.VersionCheckResponse{}, nil
}
func gRPCRegister(s *grpc.Server, srv interface{}) { func gRPCRegister(s *grpc.Server, srv interface{}) {
switch g := srv.(type) { switch g := srv.(type) {
case *gRPCProxy: case *gRPCProxy:
pb.RegisterAgentServiceServer(s, g) pb.RegisterAgentServiceServer(s, g)
pb.RegisterHealthServer(s, g)
} }
} }
@ -206,6 +216,7 @@ var reqList = []interface{}{
&pb.StartContainerRequest{}, &pb.StartContainerRequest{},
&pb.RemoveContainerRequest{}, &pb.RemoveContainerRequest{},
&pb.SignalProcessRequest{}, &pb.SignalProcessRequest{},
&pb.CheckRequest{},
} }
func TestKataAgentSendReq(t *testing.T) { func TestKataAgentSendReq(t *testing.T) {

120
virtcontainers/monitor.go Normal file
View File

@ -0,0 +1,120 @@
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
package virtcontainers
import (
"sync"
"time"
)
const defaultCheckInterval = 10 * time.Second
type monitor struct {
sync.Mutex
sandbox *Sandbox
checkInterval time.Duration
watchers []chan error
running bool
stopCh chan bool
wg sync.WaitGroup
}
func newMonitor(s *Sandbox) *monitor {
return &monitor{
sandbox: s,
checkInterval: defaultCheckInterval,
stopCh: make(chan bool, 1),
}
}
func (m *monitor) newWatcher() (chan error, error) {
m.Lock()
defer m.Unlock()
watcher := make(chan error, 1)
m.watchers = append(m.watchers, watcher)
if !m.running {
m.running = true
m.wg.Add(1)
// create and start agent watcher
go func() {
tick := time.NewTicker(m.checkInterval)
for {
select {
case <-m.stopCh:
tick.Stop()
m.wg.Done()
return
case <-tick.C:
m.watchAgent()
}
}
}()
}
return watcher, nil
}
func (m *monitor) notify(err error) {
m.Lock()
defer m.Unlock()
if !m.running {
return
}
// a watcher is not supposed to close the channel
// but just in case...
defer func() {
if x := recover(); x != nil {
virtLog.Warnf("watcher closed channel: %v", x)
}
}()
for _, c := range m.watchers {
c <- err
}
}
func (m *monitor) stop() {
// wait outside of monitor lock for the watcher channel to exit.
defer m.wg.Wait()
m.Lock()
defer m.Unlock()
if !m.running {
return
}
defer func() {
m.stopCh <- true
m.watchers = nil
m.running = false
}()
// a watcher is not supposed to close the channel
// but just in case...
defer func() {
if x := recover(); x != nil {
virtLog.Warnf("watcher closed channel: %v", x)
}
}()
for _, c := range m.watchers {
close(c)
}
}
func (m *monitor) watchAgent() {
err := m.sandbox.agent.check()
if err != nil {
m.notify(err)
}
}

View File

@ -0,0 +1,62 @@
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
package virtcontainers
import (
"errors"
"testing"
"github.com/stretchr/testify/assert"
)
func TestMonitorSuccess(t *testing.T) {
contID := "505"
contConfig := newTestContainerConfigNoop(contID)
hConfig := newHypervisorConfig(nil, nil)
// create a sandbox
s, err := testCreateSandbox(t, testSandboxID, MockHypervisor, hConfig, NoopAgentType, NoopNetworkModel, NetworkConfig{}, []ContainerConfig{contConfig}, nil)
if err != nil {
t.Fatal(err)
}
defer cleanUp()
m := newMonitor(s)
ch, err := m.newWatcher()
assert.Nil(t, err, "newWatcher failed: %v", err)
fakeErr := errors.New("foobar error")
m.notify(fakeErr)
resultErr := <-ch
assert.True(t, resultErr == fakeErr, "monitor notification mismatch %v vs. %v", resultErr, fakeErr)
m.stop()
}
func TestMonitorClosedChannel(t *testing.T) {
contID := "505"
contConfig := newTestContainerConfigNoop(contID)
hConfig := newHypervisorConfig(nil, nil)
// create a sandbox
s, err := testCreateSandbox(t, testSandboxID, MockHypervisor, hConfig, NoopAgentType, NoopNetworkModel, NetworkConfig{}, []ContainerConfig{contConfig}, nil)
if err != nil {
t.Fatal(err)
}
defer cleanUp()
m := newMonitor(s)
ch, err := m.newWatcher()
assert.Nil(t, err, "newWatcher failed: %v", err)
close(ch)
fakeErr := errors.New("foobar error")
m.notify(fakeErr)
m.stop()
}

View File

@ -78,3 +78,8 @@ func (n *noopAgent) processListContainer(sandbox *Sandbox, c Container, options
func (n *noopAgent) onlineCPUMem(cpus uint32) error { func (n *noopAgent) onlineCPUMem(cpus uint32) error {
return nil return nil
} }
// check is the Noop agent health checker. It does nothing.
func (n *noopAgent) check() error {
return nil
}

View File

@ -99,3 +99,8 @@ func (p *Sandbox) Status() vc.SandboxStatus {
func (p *Sandbox) EnterContainer(containerID string, cmd vc.Cmd) (vc.VCContainer, *vc.Process, error) { func (p *Sandbox) EnterContainer(containerID string, cmd vc.Cmd) (vc.VCContainer, *vc.Process, error) {
return &Container{}, &vc.Process{}, nil return &Container{}, &vc.Process{}, nil
} }
// Monitor implements the VCSandbox function of the same name.
func (p *Sandbox) Monitor() (chan error, error) {
return nil, nil
}

View File

@ -437,10 +437,12 @@ func unlockSandbox(lockFile *os.File) error {
type Sandbox struct { type Sandbox struct {
id string id string
sync.Mutex
hypervisor hypervisor hypervisor hypervisor
agent agent agent agent
storage resourceStorage storage resourceStorage
network network network network
monitor *monitor
config *SandboxConfig config *SandboxConfig
@ -532,6 +534,9 @@ func (s *Sandbox) GetContainer(containerID string) VCContainer {
// Release closes the agent connection and removes sandbox from internal list. // Release closes the agent connection and removes sandbox from internal list.
func (s *Sandbox) Release() error { func (s *Sandbox) Release() error {
globalSandboxList.removeSandbox(s.id) globalSandboxList.removeSandbox(s.id)
if s.monitor != nil {
s.monitor.stop()
}
return s.agent.disconnect() return s.agent.disconnect()
} }
@ -561,6 +566,21 @@ func (s *Sandbox) Status() SandboxStatus {
} }
} }
// Monitor returns a error channel for watcher to watch at
func (s *Sandbox) Monitor() (chan error, error) {
if s.state.State != StateRunning {
return nil, fmt.Errorf("Sandbox is not running")
}
s.Lock()
if s.monitor == nil {
s.monitor = newMonitor(s)
}
s.Unlock()
return s.monitor.newWatcher()
}
func createAssets(sandboxConfig *SandboxConfig) error { func createAssets(sandboxConfig *SandboxConfig) error {
kernel, err := newAsset(sandboxConfig, kernelAsset) kernel, err := newAsset(sandboxConfig, kernelAsset)
if err != nil { if err != nil {
@ -808,6 +828,10 @@ func (s *Sandbox) Delete() error {
globalSandboxList.removeSandbox(s.id) globalSandboxList.removeSandbox(s.id)
if s.monitor != nil {
s.monitor.stop()
}
return s.storage.deleteSandboxResources(s.id, nil) return s.storage.deleteSandboxResources(s.id, nil)
} }

View File

@ -1388,3 +1388,23 @@ func TestEnterContainer(t *testing.T) {
_, _, err = s.EnterContainer(contID, cmd) _, _, err = s.EnterContainer(contID, cmd)
assert.Nil(t, err, "Enter container failed: %v", err) assert.Nil(t, err, "Enter container failed: %v", err)
} }
func TestMonitor(t *testing.T) {
s, err := testCreateSandbox(t, testSandboxID, MockHypervisor, newHypervisorConfig(nil, nil), NoopAgentType, NoopNetworkModel, NetworkConfig{}, nil, nil)
assert.Nil(t, err, "VirtContainers should not allow empty sandboxes")
defer cleanUp()
_, err = s.Monitor()
assert.NotNil(t, err, "Monitoring non-running container should fail")
err = s.start()
assert.Nil(t, err, "Failed to start sandbox: %v", err)
_, err = s.Monitor()
assert.Nil(t, err, "Monitor sandbox failed: %v", err)
_, err = s.Monitor()
assert.Nil(t, err, "Monitor sandbox again failed: %v", err)
s.monitor.stop()
}