mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-07-05 03:26:37 +00:00
shimv2: Send task events to containerd/cri
The Runtime v2 supports an async event model. In order for the an upstream caller (such as Docker) to get these events in the correct order a Runtime v2 shim MUST implement some events. For much more info, please see: https://github.com/containerd/containerd/blob/master/runtime/v2/README.md#events Fixes:#1204 Signed-off-by: fupan <lifupan@gmail.com>
This commit is contained in:
parent
29dae85ad5
commit
96e524d2a0
@ -91,7 +91,8 @@ type exit struct {
|
|||||||
|
|
||||||
// service is the shim implementation of a remote shim over GRPC
|
// service is the shim implementation of a remote shim over GRPC
|
||||||
type service struct {
|
type service struct {
|
||||||
sync.Mutex
|
mu sync.Mutex
|
||||||
|
eventSendMu sync.Mutex
|
||||||
|
|
||||||
// pid Since this shimv2 cannot get the container processes pid from VM,
|
// pid Since this shimv2 cannot get the container processes pid from VM,
|
||||||
// thus for the returned values needed pid, just return this shim's
|
// thus for the returned values needed pid, just return this shim's
|
||||||
@ -211,6 +212,21 @@ func (s *service) forward(publisher events.Publisher) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *service) send(evt interface{}) {
|
||||||
|
// for unit test, it will not initialize s.events
|
||||||
|
if s.events != nil {
|
||||||
|
s.events <- evt
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *service) sendL(evt interface{}) {
|
||||||
|
s.eventSendMu.Lock()
|
||||||
|
if s.events != nil {
|
||||||
|
s.events <- evt
|
||||||
|
}
|
||||||
|
s.eventSendMu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
func getTopic(ctx context.Context, e interface{}) string {
|
func getTopic(ctx context.Context, e interface{}) string {
|
||||||
switch e.(type) {
|
switch e.(type) {
|
||||||
case *eventstypes.TaskCreate:
|
case *eventstypes.TaskCreate:
|
||||||
@ -291,8 +307,8 @@ func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error)
|
|||||||
|
|
||||||
// Create a new sandbox or container with the underlying OCI runtime
|
// Create a new sandbox or container with the underlying OCI runtime
|
||||||
func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) {
|
func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) {
|
||||||
s.Lock()
|
s.mu.Lock()
|
||||||
defer s.Unlock()
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
//the network namespace created by cni plugin
|
//the network namespace created by cni plugin
|
||||||
netns, err := namespaces.NamespaceRequired(ctx)
|
netns, err := namespaces.NamespaceRequired(ctx)
|
||||||
@ -328,6 +344,20 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
|
|||||||
|
|
||||||
s.containers[r.ID] = container
|
s.containers[r.ID] = container
|
||||||
|
|
||||||
|
s.send(&eventstypes.TaskCreate{
|
||||||
|
ContainerID: r.ID,
|
||||||
|
Bundle: r.Bundle,
|
||||||
|
Rootfs: r.Rootfs,
|
||||||
|
IO: &eventstypes.TaskIO{
|
||||||
|
Stdin: r.Stdin,
|
||||||
|
Stdout: r.Stdout,
|
||||||
|
Stderr: r.Stderr,
|
||||||
|
Terminal: r.Terminal,
|
||||||
|
},
|
||||||
|
Checkpoint: r.Checkpoint,
|
||||||
|
Pid: s.pid,
|
||||||
|
})
|
||||||
|
|
||||||
return &taskAPI.CreateTaskResponse{
|
return &taskAPI.CreateTaskResponse{
|
||||||
Pid: s.pid,
|
Pid: s.pid,
|
||||||
}, nil
|
}, nil
|
||||||
@ -335,26 +365,39 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
|
|||||||
|
|
||||||
// Start a process
|
// Start a process
|
||||||
func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) {
|
func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) {
|
||||||
s.Lock()
|
s.mu.Lock()
|
||||||
defer s.Unlock()
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
c, err := s.getContainer(r.ID)
|
c, err := s.getContainer(r.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// hold the send lock so that the start events are sent before any exit events in the error case
|
||||||
|
s.eventSendMu.Lock()
|
||||||
|
defer s.eventSendMu.Unlock()
|
||||||
|
|
||||||
//start a container
|
//start a container
|
||||||
if r.ExecID == "" {
|
if r.ExecID == "" {
|
||||||
err = startContainer(ctx, s, c)
|
err = startContainer(ctx, s, c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errdefs.ToGRPC(err)
|
return nil, errdefs.ToGRPC(err)
|
||||||
}
|
}
|
||||||
|
s.send(&eventstypes.TaskStart{
|
||||||
|
ContainerID: c.id,
|
||||||
|
Pid: s.pid,
|
||||||
|
})
|
||||||
} else {
|
} else {
|
||||||
//start an exec
|
//start an exec
|
||||||
_, err = startExec(ctx, s, r.ID, r.ExecID)
|
_, err = startExec(ctx, s, r.ID, r.ExecID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errdefs.ToGRPC(err)
|
return nil, errdefs.ToGRPC(err)
|
||||||
}
|
}
|
||||||
|
s.send(&eventstypes.TaskExecStarted{
|
||||||
|
ContainerID: c.id,
|
||||||
|
ExecID: r.ExecID,
|
||||||
|
Pid: s.pid,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
return &taskAPI.StartResponse{
|
return &taskAPI.StartResponse{
|
||||||
@ -364,8 +407,8 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
|
|||||||
|
|
||||||
// Delete the initial process and container
|
// Delete the initial process and container
|
||||||
func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) {
|
func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) {
|
||||||
s.Lock()
|
s.mu.Lock()
|
||||||
defer s.Unlock()
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
c, err := s.getContainer(r.ID)
|
c, err := s.getContainer(r.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -394,6 +437,13 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.send(&eventstypes.TaskDelete{
|
||||||
|
ContainerID: s.id,
|
||||||
|
Pid: s.pid,
|
||||||
|
ExitStatus: c.exit,
|
||||||
|
ExitedAt: c.time,
|
||||||
|
})
|
||||||
|
|
||||||
return &taskAPI.DeleteResponse{
|
return &taskAPI.DeleteResponse{
|
||||||
ExitStatus: c.exit,
|
ExitStatus: c.exit,
|
||||||
ExitedAt: c.time,
|
ExitedAt: c.time,
|
||||||
@ -417,8 +467,8 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP
|
|||||||
|
|
||||||
// Exec an additional process inside the container
|
// Exec an additional process inside the container
|
||||||
func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) {
|
func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) {
|
||||||
s.Lock()
|
s.mu.Lock()
|
||||||
defer s.Unlock()
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
c, err := s.getContainer(r.ID)
|
c, err := s.getContainer(r.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -436,13 +486,18 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty
|
|||||||
|
|
||||||
c.execs[r.ExecID] = execs
|
c.execs[r.ExecID] = execs
|
||||||
|
|
||||||
|
s.send(&eventstypes.TaskExecAdded{
|
||||||
|
ContainerID: c.id,
|
||||||
|
ExecID: r.ExecID,
|
||||||
|
})
|
||||||
|
|
||||||
return empty, nil
|
return empty, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ResizePty of a process
|
// ResizePty of a process
|
||||||
func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) {
|
func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) {
|
||||||
s.Lock()
|
s.mu.Lock()
|
||||||
defer s.Unlock()
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
c, err := s.getContainer(r.ID)
|
c, err := s.getContainer(r.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -471,8 +526,8 @@ func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*
|
|||||||
|
|
||||||
// State returns runtime state information for a process
|
// State returns runtime state information for a process
|
||||||
func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) {
|
func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) {
|
||||||
s.Lock()
|
s.mu.Lock()
|
||||||
defer s.Unlock()
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
c, err := s.getContainer(r.ID)
|
c, err := s.getContainer(r.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -515,8 +570,8 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.
|
|||||||
|
|
||||||
// Pause the container
|
// Pause the container
|
||||||
func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) {
|
func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) {
|
||||||
s.Lock()
|
s.mu.Lock()
|
||||||
defer s.Unlock()
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
c, err := s.getContainer(r.ID)
|
c, err := s.getContainer(r.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -536,13 +591,17 @@ func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.E
|
|||||||
c.status = task.StatusUnknown
|
c.status = task.StatusUnknown
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.send(&eventstypes.TaskPaused{
|
||||||
|
c.id,
|
||||||
|
})
|
||||||
|
|
||||||
return empty, err
|
return empty, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Resume the container
|
// Resume the container
|
||||||
func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) {
|
func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) {
|
||||||
s.Lock()
|
s.mu.Lock()
|
||||||
defer s.Unlock()
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
c, err := s.getContainer(r.ID)
|
c, err := s.getContainer(r.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -560,13 +619,17 @@ func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes
|
|||||||
c.status = task.StatusUnknown
|
c.status = task.StatusUnknown
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.send(&eventstypes.TaskResumed{
|
||||||
|
c.id,
|
||||||
|
})
|
||||||
|
|
||||||
return empty, err
|
return empty, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Kill a process with the provided signal
|
// Kill a process with the provided signal
|
||||||
func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) {
|
func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) {
|
||||||
s.Lock()
|
s.mu.Lock()
|
||||||
defer s.Unlock()
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
signum := syscall.Signal(r.Signal)
|
signum := syscall.Signal(r.Signal)
|
||||||
|
|
||||||
@ -617,8 +680,8 @@ func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.Pi
|
|||||||
|
|
||||||
// CloseIO of a process
|
// CloseIO of a process
|
||||||
func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) {
|
func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) {
|
||||||
s.Lock()
|
s.mu.Lock()
|
||||||
defer s.Unlock()
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
c, err := s.getContainer(r.ID)
|
c, err := s.getContainer(r.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -650,8 +713,8 @@ func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskReque
|
|||||||
|
|
||||||
// Connect returns shim information such as the shim's pid
|
// Connect returns shim information such as the shim's pid
|
||||||
func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) {
|
func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) {
|
||||||
s.Lock()
|
s.mu.Lock()
|
||||||
defer s.Unlock()
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
return &taskAPI.ConnectResponse{
|
return &taskAPI.ConnectResponse{
|
||||||
ShimPid: s.pid,
|
ShimPid: s.pid,
|
||||||
@ -661,12 +724,12 @@ func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*task
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) {
|
func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) {
|
||||||
s.Lock()
|
s.mu.Lock()
|
||||||
if len(s.containers) != 0 {
|
if len(s.containers) != 0 {
|
||||||
s.Unlock()
|
s.mu.Unlock()
|
||||||
return empty, nil
|
return empty, nil
|
||||||
}
|
}
|
||||||
s.Unlock()
|
s.mu.Unlock()
|
||||||
|
|
||||||
os.Exit(0)
|
os.Exit(0)
|
||||||
|
|
||||||
@ -676,8 +739,8 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*pt
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) {
|
func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) {
|
||||||
s.Lock()
|
s.mu.Lock()
|
||||||
defer s.Unlock()
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
c, err := s.getContainer(r.ID)
|
c, err := s.getContainer(r.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -696,8 +759,8 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.
|
|||||||
|
|
||||||
// Update a running container
|
// Update a running container
|
||||||
func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) {
|
func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) {
|
||||||
s.Lock()
|
s.mu.Lock()
|
||||||
defer s.Unlock()
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
var resources *specs.LinuxResources
|
var resources *specs.LinuxResources
|
||||||
v, err := typeurl.UnmarshalAny(r.Resources)
|
v, err := typeurl.UnmarshalAny(r.Resources)
|
||||||
@ -721,9 +784,9 @@ func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*pt
|
|||||||
func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) {
|
func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) {
|
||||||
var ret uint32
|
var ret uint32
|
||||||
|
|
||||||
s.Lock()
|
s.mu.Lock()
|
||||||
c, err := s.getContainer(r.ID)
|
c, err := s.getContainer(r.ID)
|
||||||
s.Unlock()
|
s.mu.Unlock()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -760,20 +823,22 @@ func (s *service) processExits() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *service) checkProcesses(e exit) {
|
func (s *service) checkProcesses(e exit) {
|
||||||
s.Lock()
|
s.mu.Lock()
|
||||||
defer s.Unlock()
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
id := e.execid
|
id := e.execid
|
||||||
if id == "" {
|
if id == "" {
|
||||||
id = e.id
|
id = e.id
|
||||||
}
|
}
|
||||||
s.events <- &eventstypes.TaskExit{
|
|
||||||
|
s.sendL(&eventstypes.TaskExit{
|
||||||
ContainerID: e.id,
|
ContainerID: e.id,
|
||||||
ID: id,
|
ID: id,
|
||||||
Pid: e.pid,
|
Pid: e.pid,
|
||||||
ExitStatus: uint32(e.status),
|
ExitStatus: uint32(e.status),
|
||||||
ExitedAt: e.timestamp,
|
ExitedAt: e.timestamp,
|
||||||
}
|
})
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user