diff --git a/containerd-shim-v2/service.go b/containerd-shim-v2/service.go index 08595505f6..7ed3971ae3 100644 --- a/containerd-shim-v2/service.go +++ b/containerd-shim-v2/service.go @@ -91,7 +91,8 @@ type exit struct { // service is the shim implementation of a remote shim over GRPC type service struct { - sync.Mutex + mu sync.Mutex + eventSendMu sync.Mutex // pid Since this shimv2 cannot get the container processes pid from VM, // thus for the returned values needed pid, just return this shim's @@ -211,6 +212,21 @@ func (s *service) forward(publisher events.Publisher) { } } +func (s *service) send(evt interface{}) { + // for unit test, it will not initialize s.events + if s.events != nil { + s.events <- evt + } +} + +func (s *service) sendL(evt interface{}) { + s.eventSendMu.Lock() + if s.events != nil { + s.events <- evt + } + s.eventSendMu.Unlock() +} + func getTopic(ctx context.Context, e interface{}) string { switch e.(type) { case *eventstypes.TaskCreate: @@ -291,8 +307,8 @@ func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) // Create a new sandbox or container with the underlying OCI runtime func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) { - s.Lock() - defer s.Unlock() + s.mu.Lock() + defer s.mu.Unlock() //the network namespace created by cni plugin netns, err := namespaces.NamespaceRequired(ctx) @@ -328,6 +344,20 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ * s.containers[r.ID] = container + s.send(&eventstypes.TaskCreate{ + ContainerID: r.ID, + Bundle: r.Bundle, + Rootfs: r.Rootfs, + IO: &eventstypes.TaskIO{ + Stdin: r.Stdin, + Stdout: r.Stdout, + Stderr: r.Stderr, + Terminal: r.Terminal, + }, + Checkpoint: r.Checkpoint, + Pid: s.pid, + }) + return &taskAPI.CreateTaskResponse{ Pid: s.pid, }, nil @@ -335,26 +365,39 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ * // Start a process func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) { - s.Lock() - defer s.Unlock() + s.mu.Lock() + defer s.mu.Unlock() c, err := s.getContainer(r.ID) if err != nil { return nil, err } + // hold the send lock so that the start events are sent before any exit events in the error case + s.eventSendMu.Lock() + defer s.eventSendMu.Unlock() + //start a container if r.ExecID == "" { err = startContainer(ctx, s, c) if err != nil { return nil, errdefs.ToGRPC(err) } + s.send(&eventstypes.TaskStart{ + ContainerID: c.id, + Pid: s.pid, + }) } else { //start an exec _, err = startExec(ctx, s, r.ID, r.ExecID) if err != nil { return nil, errdefs.ToGRPC(err) } + s.send(&eventstypes.TaskExecStarted{ + ContainerID: c.id, + ExecID: r.ExecID, + Pid: s.pid, + }) } return &taskAPI.StartResponse{ @@ -364,8 +407,8 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI. // Delete the initial process and container func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) { - s.Lock() - defer s.Unlock() + s.mu.Lock() + defer s.mu.Unlock() c, err := s.getContainer(r.ID) if err != nil { @@ -394,6 +437,13 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP } } + s.send(&eventstypes.TaskDelete{ + ContainerID: s.id, + Pid: s.pid, + ExitStatus: c.exit, + ExitedAt: c.time, + }) + return &taskAPI.DeleteResponse{ ExitStatus: c.exit, ExitedAt: c.time, @@ -417,8 +467,8 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP // Exec an additional process inside the container func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) { - s.Lock() - defer s.Unlock() + s.mu.Lock() + defer s.mu.Unlock() c, err := s.getContainer(r.ID) if err != nil { @@ -436,13 +486,18 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty c.execs[r.ExecID] = execs + s.send(&eventstypes.TaskExecAdded{ + ContainerID: c.id, + ExecID: r.ExecID, + }) + return empty, nil } // ResizePty of a process func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) { - s.Lock() - defer s.Unlock() + s.mu.Lock() + defer s.mu.Unlock() c, err := s.getContainer(r.ID) if err != nil { @@ -471,8 +526,8 @@ func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (* // State returns runtime state information for a process func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) { - s.Lock() - defer s.Unlock() + s.mu.Lock() + defer s.mu.Unlock() c, err := s.getContainer(r.ID) if err != nil { @@ -515,8 +570,8 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI. // Pause the container func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) { - s.Lock() - defer s.Unlock() + s.mu.Lock() + defer s.mu.Unlock() c, err := s.getContainer(r.ID) if err != nil { @@ -536,13 +591,17 @@ func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.E c.status = task.StatusUnknown } + s.send(&eventstypes.TaskPaused{ + c.id, + }) + return empty, err } // Resume the container func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) { - s.Lock() - defer s.Unlock() + s.mu.Lock() + defer s.mu.Unlock() c, err := s.getContainer(r.ID) if err != nil { @@ -560,13 +619,17 @@ func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes c.status = task.StatusUnknown } + s.send(&eventstypes.TaskResumed{ + c.id, + }) + return empty, err } // Kill a process with the provided signal func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) { - s.Lock() - defer s.Unlock() + s.mu.Lock() + defer s.mu.Unlock() signum := syscall.Signal(r.Signal) @@ -617,8 +680,8 @@ func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.Pi // CloseIO of a process func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) { - s.Lock() - defer s.Unlock() + s.mu.Lock() + defer s.mu.Unlock() c, err := s.getContainer(r.ID) if err != nil { @@ -650,8 +713,8 @@ func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskReque // Connect returns shim information such as the shim's pid func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) { - s.Lock() - defer s.Unlock() + s.mu.Lock() + defer s.mu.Unlock() return &taskAPI.ConnectResponse{ ShimPid: s.pid, @@ -661,12 +724,12 @@ func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*task } func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) { - s.Lock() + s.mu.Lock() if len(s.containers) != 0 { - s.Unlock() + s.mu.Unlock() return empty, nil } - s.Unlock() + s.mu.Unlock() os.Exit(0) @@ -676,8 +739,8 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*pt } func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) { - s.Lock() - defer s.Unlock() + s.mu.Lock() + defer s.mu.Unlock() c, err := s.getContainer(r.ID) if err != nil { @@ -696,8 +759,8 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI. // Update a running container func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) { - s.Lock() - defer s.Unlock() + s.mu.Lock() + defer s.mu.Unlock() var resources *specs.LinuxResources v, err := typeurl.UnmarshalAny(r.Resources) @@ -721,9 +784,9 @@ func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*pt func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) { var ret uint32 - s.Lock() + s.mu.Lock() c, err := s.getContainer(r.ID) - s.Unlock() + s.mu.Unlock() if err != nil { return nil, err @@ -760,20 +823,22 @@ func (s *service) processExits() { } func (s *service) checkProcesses(e exit) { - s.Lock() - defer s.Unlock() + s.mu.Lock() + defer s.mu.Unlock() id := e.execid if id == "" { id = e.id } - s.events <- &eventstypes.TaskExit{ + + s.sendL(&eventstypes.TaskExit{ ContainerID: e.id, ID: id, Pid: e.pid, ExitStatus: uint32(e.status), ExitedAt: e.timestamp, - } + }) + return }