mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-06-30 17:22:33 +00:00
shimv2: Send task events to containerd/cri
The Runtime v2 supports an async event model. In order for the an upstream caller (such as Docker) to get these events in the correct order a Runtime v2 shim MUST implement some events. For much more info, please see: https://github.com/containerd/containerd/blob/master/runtime/v2/README.md#events Fixes:#1204 Signed-off-by: fupan <lifupan@gmail.com>
This commit is contained in:
parent
29dae85ad5
commit
96e524d2a0
@ -91,7 +91,8 @@ type exit struct {
|
||||
|
||||
// service is the shim implementation of a remote shim over GRPC
|
||||
type service struct {
|
||||
sync.Mutex
|
||||
mu sync.Mutex
|
||||
eventSendMu sync.Mutex
|
||||
|
||||
// pid Since this shimv2 cannot get the container processes pid from VM,
|
||||
// thus for the returned values needed pid, just return this shim's
|
||||
@ -211,6 +212,21 @@ func (s *service) forward(publisher events.Publisher) {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *service) send(evt interface{}) {
|
||||
// for unit test, it will not initialize s.events
|
||||
if s.events != nil {
|
||||
s.events <- evt
|
||||
}
|
||||
}
|
||||
|
||||
func (s *service) sendL(evt interface{}) {
|
||||
s.eventSendMu.Lock()
|
||||
if s.events != nil {
|
||||
s.events <- evt
|
||||
}
|
||||
s.eventSendMu.Unlock()
|
||||
}
|
||||
|
||||
func getTopic(ctx context.Context, e interface{}) string {
|
||||
switch e.(type) {
|
||||
case *eventstypes.TaskCreate:
|
||||
@ -291,8 +307,8 @@ func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error)
|
||||
|
||||
// Create a new sandbox or container with the underlying OCI runtime
|
||||
func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
//the network namespace created by cni plugin
|
||||
netns, err := namespaces.NamespaceRequired(ctx)
|
||||
@ -328,6 +344,20 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
|
||||
|
||||
s.containers[r.ID] = container
|
||||
|
||||
s.send(&eventstypes.TaskCreate{
|
||||
ContainerID: r.ID,
|
||||
Bundle: r.Bundle,
|
||||
Rootfs: r.Rootfs,
|
||||
IO: &eventstypes.TaskIO{
|
||||
Stdin: r.Stdin,
|
||||
Stdout: r.Stdout,
|
||||
Stderr: r.Stderr,
|
||||
Terminal: r.Terminal,
|
||||
},
|
||||
Checkpoint: r.Checkpoint,
|
||||
Pid: s.pid,
|
||||
})
|
||||
|
||||
return &taskAPI.CreateTaskResponse{
|
||||
Pid: s.pid,
|
||||
}, nil
|
||||
@ -335,26 +365,39 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
|
||||
|
||||
// Start a process
|
||||
func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
c, err := s.getContainer(r.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// hold the send lock so that the start events are sent before any exit events in the error case
|
||||
s.eventSendMu.Lock()
|
||||
defer s.eventSendMu.Unlock()
|
||||
|
||||
//start a container
|
||||
if r.ExecID == "" {
|
||||
err = startContainer(ctx, s, c)
|
||||
if err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
s.send(&eventstypes.TaskStart{
|
||||
ContainerID: c.id,
|
||||
Pid: s.pid,
|
||||
})
|
||||
} else {
|
||||
//start an exec
|
||||
_, err = startExec(ctx, s, r.ID, r.ExecID)
|
||||
if err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
s.send(&eventstypes.TaskExecStarted{
|
||||
ContainerID: c.id,
|
||||
ExecID: r.ExecID,
|
||||
Pid: s.pid,
|
||||
})
|
||||
}
|
||||
|
||||
return &taskAPI.StartResponse{
|
||||
@ -364,8 +407,8 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
|
||||
|
||||
// Delete the initial process and container
|
||||
func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
c, err := s.getContainer(r.ID)
|
||||
if err != nil {
|
||||
@ -394,6 +437,13 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP
|
||||
}
|
||||
}
|
||||
|
||||
s.send(&eventstypes.TaskDelete{
|
||||
ContainerID: s.id,
|
||||
Pid: s.pid,
|
||||
ExitStatus: c.exit,
|
||||
ExitedAt: c.time,
|
||||
})
|
||||
|
||||
return &taskAPI.DeleteResponse{
|
||||
ExitStatus: c.exit,
|
||||
ExitedAt: c.time,
|
||||
@ -417,8 +467,8 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP
|
||||
|
||||
// Exec an additional process inside the container
|
||||
func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
c, err := s.getContainer(r.ID)
|
||||
if err != nil {
|
||||
@ -436,13 +486,18 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty
|
||||
|
||||
c.execs[r.ExecID] = execs
|
||||
|
||||
s.send(&eventstypes.TaskExecAdded{
|
||||
ContainerID: c.id,
|
||||
ExecID: r.ExecID,
|
||||
})
|
||||
|
||||
return empty, nil
|
||||
}
|
||||
|
||||
// ResizePty of a process
|
||||
func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
c, err := s.getContainer(r.ID)
|
||||
if err != nil {
|
||||
@ -471,8 +526,8 @@ func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*
|
||||
|
||||
// State returns runtime state information for a process
|
||||
func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
c, err := s.getContainer(r.ID)
|
||||
if err != nil {
|
||||
@ -515,8 +570,8 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.
|
||||
|
||||
// Pause the container
|
||||
func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
c, err := s.getContainer(r.ID)
|
||||
if err != nil {
|
||||
@ -536,13 +591,17 @@ func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.E
|
||||
c.status = task.StatusUnknown
|
||||
}
|
||||
|
||||
s.send(&eventstypes.TaskPaused{
|
||||
c.id,
|
||||
})
|
||||
|
||||
return empty, err
|
||||
}
|
||||
|
||||
// Resume the container
|
||||
func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
c, err := s.getContainer(r.ID)
|
||||
if err != nil {
|
||||
@ -560,13 +619,17 @@ func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes
|
||||
c.status = task.StatusUnknown
|
||||
}
|
||||
|
||||
s.send(&eventstypes.TaskResumed{
|
||||
c.id,
|
||||
})
|
||||
|
||||
return empty, err
|
||||
}
|
||||
|
||||
// Kill a process with the provided signal
|
||||
func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
signum := syscall.Signal(r.Signal)
|
||||
|
||||
@ -617,8 +680,8 @@ func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.Pi
|
||||
|
||||
// CloseIO of a process
|
||||
func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
c, err := s.getContainer(r.ID)
|
||||
if err != nil {
|
||||
@ -650,8 +713,8 @@ func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskReque
|
||||
|
||||
// Connect returns shim information such as the shim's pid
|
||||
func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
return &taskAPI.ConnectResponse{
|
||||
ShimPid: s.pid,
|
||||
@ -661,12 +724,12 @@ func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*task
|
||||
}
|
||||
|
||||
func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) {
|
||||
s.Lock()
|
||||
s.mu.Lock()
|
||||
if len(s.containers) != 0 {
|
||||
s.Unlock()
|
||||
s.mu.Unlock()
|
||||
return empty, nil
|
||||
}
|
||||
s.Unlock()
|
||||
s.mu.Unlock()
|
||||
|
||||
os.Exit(0)
|
||||
|
||||
@ -676,8 +739,8 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*pt
|
||||
}
|
||||
|
||||
func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
c, err := s.getContainer(r.ID)
|
||||
if err != nil {
|
||||
@ -696,8 +759,8 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.
|
||||
|
||||
// Update a running container
|
||||
func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
var resources *specs.LinuxResources
|
||||
v, err := typeurl.UnmarshalAny(r.Resources)
|
||||
@ -721,9 +784,9 @@ func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*pt
|
||||
func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) {
|
||||
var ret uint32
|
||||
|
||||
s.Lock()
|
||||
s.mu.Lock()
|
||||
c, err := s.getContainer(r.ID)
|
||||
s.Unlock()
|
||||
s.mu.Unlock()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -760,20 +823,22 @@ func (s *service) processExits() {
|
||||
}
|
||||
|
||||
func (s *service) checkProcesses(e exit) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
id := e.execid
|
||||
if id == "" {
|
||||
id = e.id
|
||||
}
|
||||
s.events <- &eventstypes.TaskExit{
|
||||
|
||||
s.sendL(&eventstypes.TaskExit{
|
||||
ContainerID: e.id,
|
||||
ID: id,
|
||||
Pid: e.pid,
|
||||
ExitStatus: uint32(e.status),
|
||||
ExitedAt: e.timestamp,
|
||||
}
|
||||
})
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user