shimv2: Send task events to containerd/cri

The Runtime v2 supports an async event model. In order for the an upstream
caller (such as Docker) to get these events in the correct order a Runtime
v2 shim MUST implement some events.

For much more info, please see:
https://github.com/containerd/containerd/blob/master/runtime/v2/README.md#events

Fixes:#1204

Signed-off-by: fupan <lifupan@gmail.com>
This commit is contained in:
fupan 2019-02-01 11:25:15 +08:00 committed by Fupan Li
parent 29dae85ad5
commit 96e524d2a0

View File

@ -91,7 +91,8 @@ type exit struct {
// service is the shim implementation of a remote shim over GRPC
type service struct {
sync.Mutex
mu sync.Mutex
eventSendMu sync.Mutex
// pid Since this shimv2 cannot get the container processes pid from VM,
// thus for the returned values needed pid, just return this shim's
@ -211,6 +212,21 @@ func (s *service) forward(publisher events.Publisher) {
}
}
func (s *service) send(evt interface{}) {
// for unit test, it will not initialize s.events
if s.events != nil {
s.events <- evt
}
}
func (s *service) sendL(evt interface{}) {
s.eventSendMu.Lock()
if s.events != nil {
s.events <- evt
}
s.eventSendMu.Unlock()
}
func getTopic(ctx context.Context, e interface{}) string {
switch e.(type) {
case *eventstypes.TaskCreate:
@ -291,8 +307,8 @@ func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error)
// Create a new sandbox or container with the underlying OCI runtime
func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) {
s.Lock()
defer s.Unlock()
s.mu.Lock()
defer s.mu.Unlock()
//the network namespace created by cni plugin
netns, err := namespaces.NamespaceRequired(ctx)
@ -328,6 +344,20 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
s.containers[r.ID] = container
s.send(&eventstypes.TaskCreate{
ContainerID: r.ID,
Bundle: r.Bundle,
Rootfs: r.Rootfs,
IO: &eventstypes.TaskIO{
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Terminal: r.Terminal,
},
Checkpoint: r.Checkpoint,
Pid: s.pid,
})
return &taskAPI.CreateTaskResponse{
Pid: s.pid,
}, nil
@ -335,26 +365,39 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
// Start a process
func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) {
s.Lock()
defer s.Unlock()
s.mu.Lock()
defer s.mu.Unlock()
c, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
// hold the send lock so that the start events are sent before any exit events in the error case
s.eventSendMu.Lock()
defer s.eventSendMu.Unlock()
//start a container
if r.ExecID == "" {
err = startContainer(ctx, s, c)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
s.send(&eventstypes.TaskStart{
ContainerID: c.id,
Pid: s.pid,
})
} else {
//start an exec
_, err = startExec(ctx, s, r.ID, r.ExecID)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
s.send(&eventstypes.TaskExecStarted{
ContainerID: c.id,
ExecID: r.ExecID,
Pid: s.pid,
})
}
return &taskAPI.StartResponse{
@ -364,8 +407,8 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
// Delete the initial process and container
func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) {
s.Lock()
defer s.Unlock()
s.mu.Lock()
defer s.mu.Unlock()
c, err := s.getContainer(r.ID)
if err != nil {
@ -394,6 +437,13 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP
}
}
s.send(&eventstypes.TaskDelete{
ContainerID: s.id,
Pid: s.pid,
ExitStatus: c.exit,
ExitedAt: c.time,
})
return &taskAPI.DeleteResponse{
ExitStatus: c.exit,
ExitedAt: c.time,
@ -417,8 +467,8 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP
// Exec an additional process inside the container
func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) {
s.Lock()
defer s.Unlock()
s.mu.Lock()
defer s.mu.Unlock()
c, err := s.getContainer(r.ID)
if err != nil {
@ -436,13 +486,18 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty
c.execs[r.ExecID] = execs
s.send(&eventstypes.TaskExecAdded{
ContainerID: c.id,
ExecID: r.ExecID,
})
return empty, nil
}
// ResizePty of a process
func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) {
s.Lock()
defer s.Unlock()
s.mu.Lock()
defer s.mu.Unlock()
c, err := s.getContainer(r.ID)
if err != nil {
@ -471,8 +526,8 @@ func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*
// State returns runtime state information for a process
func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) {
s.Lock()
defer s.Unlock()
s.mu.Lock()
defer s.mu.Unlock()
c, err := s.getContainer(r.ID)
if err != nil {
@ -515,8 +570,8 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.
// Pause the container
func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) {
s.Lock()
defer s.Unlock()
s.mu.Lock()
defer s.mu.Unlock()
c, err := s.getContainer(r.ID)
if err != nil {
@ -536,13 +591,17 @@ func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.E
c.status = task.StatusUnknown
}
s.send(&eventstypes.TaskPaused{
c.id,
})
return empty, err
}
// Resume the container
func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) {
s.Lock()
defer s.Unlock()
s.mu.Lock()
defer s.mu.Unlock()
c, err := s.getContainer(r.ID)
if err != nil {
@ -560,13 +619,17 @@ func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes
c.status = task.StatusUnknown
}
s.send(&eventstypes.TaskResumed{
c.id,
})
return empty, err
}
// Kill a process with the provided signal
func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) {
s.Lock()
defer s.Unlock()
s.mu.Lock()
defer s.mu.Unlock()
signum := syscall.Signal(r.Signal)
@ -617,8 +680,8 @@ func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.Pi
// CloseIO of a process
func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) {
s.Lock()
defer s.Unlock()
s.mu.Lock()
defer s.mu.Unlock()
c, err := s.getContainer(r.ID)
if err != nil {
@ -650,8 +713,8 @@ func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskReque
// Connect returns shim information such as the shim's pid
func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) {
s.Lock()
defer s.Unlock()
s.mu.Lock()
defer s.mu.Unlock()
return &taskAPI.ConnectResponse{
ShimPid: s.pid,
@ -661,12 +724,12 @@ func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*task
}
func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) {
s.Lock()
s.mu.Lock()
if len(s.containers) != 0 {
s.Unlock()
s.mu.Unlock()
return empty, nil
}
s.Unlock()
s.mu.Unlock()
os.Exit(0)
@ -676,8 +739,8 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*pt
}
func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) {
s.Lock()
defer s.Unlock()
s.mu.Lock()
defer s.mu.Unlock()
c, err := s.getContainer(r.ID)
if err != nil {
@ -696,8 +759,8 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.
// Update a running container
func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) {
s.Lock()
defer s.Unlock()
s.mu.Lock()
defer s.mu.Unlock()
var resources *specs.LinuxResources
v, err := typeurl.UnmarshalAny(r.Resources)
@ -721,9 +784,9 @@ func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*pt
func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) {
var ret uint32
s.Lock()
s.mu.Lock()
c, err := s.getContainer(r.ID)
s.Unlock()
s.mu.Unlock()
if err != nil {
return nil, err
@ -760,20 +823,22 @@ func (s *service) processExits() {
}
func (s *service) checkProcesses(e exit) {
s.Lock()
defer s.Unlock()
s.mu.Lock()
defer s.mu.Unlock()
id := e.execid
if id == "" {
id = e.id
}
s.events <- &eventstypes.TaskExit{
s.sendL(&eventstypes.TaskExit{
ContainerID: e.id,
ID: id,
Pid: e.pid,
ExitStatus: uint32(e.status),
ExitedAt: e.timestamp,
}
})
return
}