diff --git a/virtcontainers/agent.go b/virtcontainers/agent.go index 3c4f25116a..301df5cbf0 100644 --- a/virtcontainers/agent.go +++ b/virtcontainers/agent.go @@ -133,6 +133,9 @@ type agent interface { // supported by the agent. capabilities() capabilities + // check will check the agent liveness + check() error + // disconnect will disconnect the connection to the agent disconnect() error diff --git a/virtcontainers/hyperstart_agent.go b/virtcontainers/hyperstart_agent.go index 32518822a5..50a0efdcec 100644 --- a/virtcontainers/hyperstart_agent.go +++ b/virtcontainers/hyperstart_agent.go @@ -801,3 +801,8 @@ func (h *hyper) onlineCPUMem(cpus uint32) error { // cc-agent uses udev to online CPUs automatically return nil } + +func (h *hyper) check() error { + // cc-agent does not support check + return nil +} diff --git a/virtcontainers/interfaces.go b/virtcontainers/interfaces.go index 73eb40c4dc..f453a13e77 100644 --- a/virtcontainers/interfaces.go +++ b/virtcontainers/interfaces.go @@ -49,6 +49,7 @@ type VCSandbox interface { Pause() error Resume() error Release() error + Monitor() (chan error, error) Delete() error Status() SandboxStatus CreateContainer(contConfig ContainerConfig) (VCContainer, error) diff --git a/virtcontainers/kata_agent.go b/virtcontainers/kata_agent.go index 20cbfa71f6..2c2e8d464e 100644 --- a/virtcontainers/kata_agent.go +++ b/virtcontainers/kata_agent.go @@ -15,7 +15,9 @@ import ( "strconv" "strings" "syscall" + "time" + "github.com/gogo/protobuf/proto" kataclient "github.com/kata-containers/agent/protocols/client" "github.com/kata-containers/agent/protocols/grpc" vcAnnotations "github.com/kata-containers/runtime/virtcontainers/pkg/annotations" @@ -73,6 +75,7 @@ type kataAgent struct { shim shim proxy proxy client *kataclient.AgentClient + reqHandlers map[string]reqFunc state KataAgentState keepConn bool proxyBuiltIn bool @@ -916,6 +919,7 @@ func (k *kataAgent) connect() error { return err } + k.installReqFunc(client) k.client = client return nil @@ -929,11 +933,62 @@ func (k *kataAgent) disconnect() error { if err := k.client.Close(); err != nil && err != golangGrpc.ErrClientConnClosing { return err } + k.client = nil + k.reqHandlers = nil return nil } +func (k *kataAgent) check() error { + _, err := k.sendReq(&grpc.CheckRequest{}) + return err +} + +type reqFunc func(context.Context, interface{}, ...golangGrpc.CallOption) (interface{}, error) + +func (k *kataAgent) installReqFunc(c *kataclient.AgentClient) { + k.reqHandlers = make(map[string]reqFunc) + k.reqHandlers["grpc.CheckRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + return k.client.Check(ctx, req.(*grpc.CheckRequest), opts...) + } + k.reqHandlers["grpc.ExecProcessRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + return k.client.ExecProcess(ctx, req.(*grpc.ExecProcessRequest), opts...) + } + k.reqHandlers["grpc.CreateSandboxRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + return k.client.CreateSandbox(ctx, req.(*grpc.CreateSandboxRequest), opts...) + } + k.reqHandlers["grpc.DestroySandboxRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + return k.client.DestroySandbox(ctx, req.(*grpc.DestroySandboxRequest), opts...) + } + k.reqHandlers["grpc.CreateContainerRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + return k.client.CreateContainer(ctx, req.(*grpc.CreateContainerRequest), opts...) + } + k.reqHandlers["grpc.StartContainerRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + return k.client.StartContainer(ctx, req.(*grpc.StartContainerRequest), opts...) + } + k.reqHandlers["grpc.RemoveContainerRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + return k.client.RemoveContainer(ctx, req.(*grpc.RemoveContainerRequest), opts...) + } + k.reqHandlers["grpc.SignalProcessRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + return k.client.SignalProcess(ctx, req.(*grpc.SignalProcessRequest), opts...) + } + k.reqHandlers["grpc.UpdateRoutesRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + return k.client.UpdateRoutes(ctx, req.(*grpc.UpdateRoutesRequest), opts...) + } + k.reqHandlers["grpc.UpdateInterfaceRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + return k.client.UpdateInterface(ctx, req.(*grpc.UpdateInterfaceRequest), opts...) + } + k.reqHandlers["grpc.OnlineCPUMemRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + return k.client.OnlineCPUMem(ctx, req.(*grpc.OnlineCPUMemRequest), opts...) + } + k.reqHandlers["grpc.ListProcessesRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + return k.client.ListProcesses(ctx, req.(*grpc.ListProcessesRequest), opts...) + } +} + func (k *kataAgent) sendReq(request interface{}) (interface{}, error) { if err := k.connect(); err != nil { return nil, err @@ -942,39 +997,11 @@ func (k *kataAgent) sendReq(request interface{}) (interface{}, error) { defer k.disconnect() } - switch req := request.(type) { - case *grpc.ExecProcessRequest: - _, err := k.client.ExecProcess(context.Background(), req) - return nil, err - case *grpc.CreateSandboxRequest: - _, err := k.client.CreateSandbox(context.Background(), req) - return nil, err - case *grpc.DestroySandboxRequest: - _, err := k.client.DestroySandbox(context.Background(), req) - return nil, err - case *grpc.CreateContainerRequest: - _, err := k.client.CreateContainer(context.Background(), req) - return nil, err - case *grpc.StartContainerRequest: - _, err := k.client.StartContainer(context.Background(), req) - return nil, err - case *grpc.RemoveContainerRequest: - _, err := k.client.RemoveContainer(context.Background(), req) - return nil, err - case *grpc.SignalProcessRequest: - _, err := k.client.SignalProcess(context.Background(), req) - return nil, err - case *grpc.UpdateRoutesRequest: - _, err := k.client.UpdateRoutes(context.Background(), req) - return nil, err - case *grpc.UpdateInterfaceRequest: - ifc, err := k.client.UpdateInterface(context.Background(), req) - return ifc, err - case *grpc.OnlineCPUMemRequest: - return k.client.OnlineCPUMem(context.Background(), req) - case *grpc.ListProcessesRequest: - return k.client.ListProcesses(context.Background(), req) - default: - return nil, fmt.Errorf("Unknown gRPC type %T", req) + msgName := proto.MessageName(request.(proto.Message)) + handler := k.reqHandlers[msgName] + if msgName == "" || handler == nil { + return nil, fmt.Errorf("Invalid request type") } + + return handler(context.Background(), request) } diff --git a/virtcontainers/kata_agent_test.go b/virtcontainers/kata_agent_test.go index 6438661d6c..d59ef9e486 100644 --- a/virtcontainers/kata_agent_test.go +++ b/virtcontainers/kata_agent_test.go @@ -191,10 +191,20 @@ func (p *gRPCProxy) OnlineCPUMem(ctx context.Context, req *pb.OnlineCPUMemReques return emptyResp, nil } +func (p *gRPCProxy) Check(ctx context.Context, req *pb.CheckRequest) (*pb.HealthCheckResponse, error) { + return &pb.HealthCheckResponse{}, nil +} + +func (p *gRPCProxy) Version(ctx context.Context, req *pb.CheckRequest) (*pb.VersionCheckResponse, error) { + return &pb.VersionCheckResponse{}, nil + +} + func gRPCRegister(s *grpc.Server, srv interface{}) { switch g := srv.(type) { case *gRPCProxy: pb.RegisterAgentServiceServer(s, g) + pb.RegisterHealthServer(s, g) } } @@ -206,6 +216,7 @@ var reqList = []interface{}{ &pb.StartContainerRequest{}, &pb.RemoveContainerRequest{}, &pb.SignalProcessRequest{}, + &pb.CheckRequest{}, } func TestKataAgentSendReq(t *testing.T) { diff --git a/virtcontainers/monitor.go b/virtcontainers/monitor.go new file mode 100644 index 0000000000..fdf523a7d6 --- /dev/null +++ b/virtcontainers/monitor.go @@ -0,0 +1,120 @@ +// Copyright (c) 2018 HyperHQ Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// + +package virtcontainers + +import ( + "sync" + "time" +) + +const defaultCheckInterval = 10 * time.Second + +type monitor struct { + sync.Mutex + + sandbox *Sandbox + checkInterval time.Duration + watchers []chan error + running bool + stopCh chan bool + wg sync.WaitGroup +} + +func newMonitor(s *Sandbox) *monitor { + return &monitor{ + sandbox: s, + checkInterval: defaultCheckInterval, + stopCh: make(chan bool, 1), + } +} + +func (m *monitor) newWatcher() (chan error, error) { + m.Lock() + defer m.Unlock() + + watcher := make(chan error, 1) + m.watchers = append(m.watchers, watcher) + + if !m.running { + m.running = true + m.wg.Add(1) + + // create and start agent watcher + go func() { + tick := time.NewTicker(m.checkInterval) + for { + select { + case <-m.stopCh: + tick.Stop() + m.wg.Done() + return + case <-tick.C: + m.watchAgent() + } + } + }() + } + + return watcher, nil +} + +func (m *monitor) notify(err error) { + m.Lock() + defer m.Unlock() + + if !m.running { + return + } + + // a watcher is not supposed to close the channel + // but just in case... + defer func() { + if x := recover(); x != nil { + virtLog.Warnf("watcher closed channel: %v", x) + } + }() + + for _, c := range m.watchers { + c <- err + } +} + +func (m *monitor) stop() { + // wait outside of monitor lock for the watcher channel to exit. + defer m.wg.Wait() + + m.Lock() + defer m.Unlock() + + if !m.running { + return + } + + defer func() { + m.stopCh <- true + m.watchers = nil + m.running = false + }() + + // a watcher is not supposed to close the channel + // but just in case... + defer func() { + if x := recover(); x != nil { + virtLog.Warnf("watcher closed channel: %v", x) + } + }() + + for _, c := range m.watchers { + close(c) + } +} + +func (m *monitor) watchAgent() { + err := m.sandbox.agent.check() + if err != nil { + m.notify(err) + } +} diff --git a/virtcontainers/monitor_test.go b/virtcontainers/monitor_test.go new file mode 100644 index 0000000000..6b03b4a151 --- /dev/null +++ b/virtcontainers/monitor_test.go @@ -0,0 +1,62 @@ +// Copyright (c) 2018 HyperHQ Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// + +package virtcontainers + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestMonitorSuccess(t *testing.T) { + contID := "505" + contConfig := newTestContainerConfigNoop(contID) + hConfig := newHypervisorConfig(nil, nil) + + // create a sandbox + s, err := testCreateSandbox(t, testSandboxID, MockHypervisor, hConfig, NoopAgentType, NoopNetworkModel, NetworkConfig{}, []ContainerConfig{contConfig}, nil) + if err != nil { + t.Fatal(err) + } + defer cleanUp() + + m := newMonitor(s) + + ch, err := m.newWatcher() + assert.Nil(t, err, "newWatcher failed: %v", err) + + fakeErr := errors.New("foobar error") + m.notify(fakeErr) + resultErr := <-ch + assert.True(t, resultErr == fakeErr, "monitor notification mismatch %v vs. %v", resultErr, fakeErr) + + m.stop() +} + +func TestMonitorClosedChannel(t *testing.T) { + contID := "505" + contConfig := newTestContainerConfigNoop(contID) + hConfig := newHypervisorConfig(nil, nil) + + // create a sandbox + s, err := testCreateSandbox(t, testSandboxID, MockHypervisor, hConfig, NoopAgentType, NoopNetworkModel, NetworkConfig{}, []ContainerConfig{contConfig}, nil) + if err != nil { + t.Fatal(err) + } + defer cleanUp() + + m := newMonitor(s) + + ch, err := m.newWatcher() + assert.Nil(t, err, "newWatcher failed: %v", err) + + close(ch) + fakeErr := errors.New("foobar error") + m.notify(fakeErr) + + m.stop() +} diff --git a/virtcontainers/noop_agent.go b/virtcontainers/noop_agent.go index 7a2f8359e9..725be20718 100644 --- a/virtcontainers/noop_agent.go +++ b/virtcontainers/noop_agent.go @@ -78,3 +78,8 @@ func (n *noopAgent) processListContainer(sandbox *Sandbox, c Container, options func (n *noopAgent) onlineCPUMem(cpus uint32) error { return nil } + +// check is the Noop agent health checker. It does nothing. +func (n *noopAgent) check() error { + return nil +} diff --git a/virtcontainers/pkg/vcmock/sandbox.go b/virtcontainers/pkg/vcmock/sandbox.go index dedc519e59..cca89c7cd4 100644 --- a/virtcontainers/pkg/vcmock/sandbox.go +++ b/virtcontainers/pkg/vcmock/sandbox.go @@ -99,3 +99,8 @@ func (p *Sandbox) Status() vc.SandboxStatus { func (p *Sandbox) EnterContainer(containerID string, cmd vc.Cmd) (vc.VCContainer, *vc.Process, error) { return &Container{}, &vc.Process{}, nil } + +// Monitor implements the VCSandbox function of the same name. +func (p *Sandbox) Monitor() (chan error, error) { + return nil, nil +} diff --git a/virtcontainers/sandbox.go b/virtcontainers/sandbox.go index 0f68c93e30..8cc92dc435 100644 --- a/virtcontainers/sandbox.go +++ b/virtcontainers/sandbox.go @@ -437,10 +437,12 @@ func unlockSandbox(lockFile *os.File) error { type Sandbox struct { id string + sync.Mutex hypervisor hypervisor agent agent storage resourceStorage network network + monitor *monitor config *SandboxConfig @@ -532,6 +534,9 @@ func (s *Sandbox) GetContainer(containerID string) VCContainer { // Release closes the agent connection and removes sandbox from internal list. func (s *Sandbox) Release() error { globalSandboxList.removeSandbox(s.id) + if s.monitor != nil { + s.monitor.stop() + } return s.agent.disconnect() } @@ -561,6 +566,21 @@ func (s *Sandbox) Status() SandboxStatus { } } +// Monitor returns a error channel for watcher to watch at +func (s *Sandbox) Monitor() (chan error, error) { + if s.state.State != StateRunning { + return nil, fmt.Errorf("Sandbox is not running") + } + + s.Lock() + if s.monitor == nil { + s.monitor = newMonitor(s) + } + s.Unlock() + + return s.monitor.newWatcher() +} + func createAssets(sandboxConfig *SandboxConfig) error { kernel, err := newAsset(sandboxConfig, kernelAsset) if err != nil { @@ -808,6 +828,10 @@ func (s *Sandbox) Delete() error { globalSandboxList.removeSandbox(s.id) + if s.monitor != nil { + s.monitor.stop() + } + return s.storage.deleteSandboxResources(s.id, nil) } diff --git a/virtcontainers/sandbox_test.go b/virtcontainers/sandbox_test.go index 86c409564f..c57d4a0ce8 100644 --- a/virtcontainers/sandbox_test.go +++ b/virtcontainers/sandbox_test.go @@ -1388,3 +1388,23 @@ func TestEnterContainer(t *testing.T) { _, _, err = s.EnterContainer(contID, cmd) assert.Nil(t, err, "Enter container failed: %v", err) } + +func TestMonitor(t *testing.T) { + s, err := testCreateSandbox(t, testSandboxID, MockHypervisor, newHypervisorConfig(nil, nil), NoopAgentType, NoopNetworkModel, NetworkConfig{}, nil, nil) + assert.Nil(t, err, "VirtContainers should not allow empty sandboxes") + defer cleanUp() + + _, err = s.Monitor() + assert.NotNil(t, err, "Monitoring non-running container should fail") + + err = s.start() + assert.Nil(t, err, "Failed to start sandbox: %v", err) + + _, err = s.Monitor() + assert.Nil(t, err, "Monitor sandbox failed: %v", err) + + _, err = s.Monitor() + assert.Nil(t, err, "Monitor sandbox again failed: %v", err) + + s.monitor.stop() +}