mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-06-30 09:13:29 +00:00
shimv2: convert vc errors to grpc errors
containerd checks for the grpc error code to determine correct recover action upon grpc errors. We need to provide them properly. Unfortunately ttrpc doesn't support grpc interceptor so we have to modify every service function for it. Fixes: #1527 Signed-off-by: Peng Tao <bergwolf@hyper.sh>
This commit is contained in:
parent
cf90751638
commit
8215a3ce9a
62
containerd-shim-v2/errors.go
Normal file
62
containerd-shim-v2/errors.go
Normal file
@ -0,0 +1,62 @@
|
||||
// Copyright (c) 2019 hyper.sh
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
package containerdshim
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"syscall"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
vc "github.com/kata-containers/runtime/virtcontainers/pkg/types"
|
||||
)
|
||||
|
||||
// toGRPC maps the virtcontainers error into a grpc error,
|
||||
// using the original error message as a description.
|
||||
func toGRPC(err error) error {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if isGRPCError(err) {
|
||||
// error has already been mapped to grpc
|
||||
return err
|
||||
}
|
||||
|
||||
err = errors.Cause(err)
|
||||
switch {
|
||||
case isInvalidArgument(err):
|
||||
return status.Errorf(codes.InvalidArgument, err.Error())
|
||||
case isNotFound(err):
|
||||
return status.Errorf(codes.NotFound, err.Error())
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// toGRPCf maps the error to grpc error codes, assembling the formatting string
|
||||
// and combining it with the target error string.
|
||||
func toGRPCf(err error, format string, args ...interface{}) error {
|
||||
return toGRPC(errors.Wrapf(err, format, args...))
|
||||
}
|
||||
|
||||
func isGRPCError(err error) bool {
|
||||
_, ok := status.FromError(err)
|
||||
return ok
|
||||
}
|
||||
|
||||
func isInvalidArgument(err error) bool {
|
||||
return err == vc.ErrNeedSandbox || err == vc.ErrNeedSandboxID ||
|
||||
err == vc.ErrNeedContainerID || err == vc.ErrNeedState ||
|
||||
err == syscall.EINVAL
|
||||
}
|
||||
|
||||
func isNotFound(err error) bool {
|
||||
return err == vc.ErrNoSuchContainer || err == syscall.ENOENT ||
|
||||
strings.Contains(err.Error(), "not found") || strings.Contains(err.Error(), "not exist")
|
||||
}
|
29
containerd-shim-v2/errors_test.go
Normal file
29
containerd-shim-v2/errors_test.go
Normal file
@ -0,0 +1,29 @@
|
||||
// Copyright (c) 2019 hyper.sh
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
package containerdshim
|
||||
|
||||
import (
|
||||
"syscall"
|
||||
"testing"
|
||||
|
||||
vc "github.com/kata-containers/runtime/virtcontainers/pkg/types"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestToGRPC(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
for _, err := range []error{vc.ErrNeedSandbox, vc.ErrNeedSandboxID,
|
||||
vc.ErrNeedContainerID, vc.ErrNeedState, syscall.EINVAL, vc.ErrNoSuchContainer, syscall.ENOENT} {
|
||||
assert.False(isGRPCError(err))
|
||||
err = toGRPC(err)
|
||||
assert.True(isGRPCError(err))
|
||||
err = toGRPC(err)
|
||||
assert.True(isGRPCError(err))
|
||||
err = toGRPCf(err, "appending")
|
||||
assert.True(isGRPCError(err))
|
||||
}
|
||||
}
|
@ -258,13 +258,17 @@ func getTopic(ctx context.Context, e interface{}) string {
|
||||
return cdruntime.TaskUnknownTopic
|
||||
}
|
||||
|
||||
func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) {
|
||||
func (s *service) Cleanup(ctx context.Context) (_ *taskAPI.DeleteResponse, err error) {
|
||||
//Since the binary cleanup will return the DeleteResponse from stdout to
|
||||
//containerd, thus we must make sure there is no any outputs in stdout except
|
||||
//the returned response, thus here redirect the log to stderr in case there's
|
||||
//any log output to stdout.
|
||||
logrus.SetOutput(os.Stderr)
|
||||
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
}()
|
||||
|
||||
if s.id == "" {
|
||||
return nil, errdefs.ToGRPCf(errdefs.ErrInvalidArgument, "the container id is empty, please specify the container id")
|
||||
}
|
||||
@ -310,6 +314,10 @@ func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error)
|
||||
|
||||
// Create a new sandbox or container with the underlying OCI runtime
|
||||
func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) {
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
}()
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@ -351,7 +359,11 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
|
||||
}
|
||||
|
||||
// Start a process
|
||||
func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) {
|
||||
func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (_ *taskAPI.StartResponse, err error) {
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
}()
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@ -393,7 +405,11 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
|
||||
}
|
||||
|
||||
// Delete the initial process and container
|
||||
func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) {
|
||||
func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (_ *taskAPI.DeleteResponse, err error) {
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
}()
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@ -453,7 +469,11 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP
|
||||
}
|
||||
|
||||
// Exec an additional process inside the container
|
||||
func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) {
|
||||
func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (_ *ptypes.Empty, err error) {
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
}()
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@ -482,7 +502,11 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty
|
||||
}
|
||||
|
||||
// ResizePty of a process
|
||||
func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) {
|
||||
func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (_ *ptypes.Empty, err error) {
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
}()
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@ -512,7 +536,11 @@ func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*
|
||||
}
|
||||
|
||||
// State returns runtime state information for a process
|
||||
func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) {
|
||||
func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (_ *taskAPI.StateResponse, err error) {
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
}()
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@ -556,7 +584,11 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.
|
||||
}
|
||||
|
||||
// Pause the container
|
||||
func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) {
|
||||
func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (_ *ptypes.Empty, err error) {
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
}()
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@ -586,7 +618,11 @@ func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.E
|
||||
}
|
||||
|
||||
// Resume the container
|
||||
func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) {
|
||||
func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (_ *ptypes.Empty, err error) {
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
}()
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@ -614,7 +650,11 @@ func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes
|
||||
}
|
||||
|
||||
// Kill a process with the provided signal
|
||||
func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) {
|
||||
func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (_ *ptypes.Empty, err error) {
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
}()
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@ -640,9 +680,13 @@ func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Emp
|
||||
// Pids returns all pids inside the container
|
||||
// Since for kata, it cannot get the process's pid from VM,
|
||||
// thus only return the Shim's pid directly.
|
||||
func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.PidsResponse, error) {
|
||||
func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (_ *taskAPI.PidsResponse, err error) {
|
||||
var processes []*task.ProcessInfo
|
||||
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
}()
|
||||
|
||||
pInfo := task.ProcessInfo{
|
||||
Pid: s.pid,
|
||||
}
|
||||
@ -654,7 +698,11 @@ func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.Pi
|
||||
}
|
||||
|
||||
// CloseIO of a process
|
||||
func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) {
|
||||
func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (_ *ptypes.Empty, err error) {
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
}()
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@ -682,12 +730,20 @@ func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptyp
|
||||
}
|
||||
|
||||
// Checkpoint the container
|
||||
func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) {
|
||||
func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (_ *ptypes.Empty, err error) {
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
}()
|
||||
|
||||
return nil, errdefs.ToGRPCf(errdefs.ErrNotImplemented, "service Checkpoint")
|
||||
}
|
||||
|
||||
// Connect returns shim information such as the shim's pid
|
||||
func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) {
|
||||
func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (_ *taskAPI.ConnectResponse, err error) {
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
}()
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@ -698,7 +754,11 @@ func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*task
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) {
|
||||
func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (_ *ptypes.Empty, err error) {
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
}()
|
||||
|
||||
s.mu.Lock()
|
||||
if len(s.containers) != 0 {
|
||||
s.mu.Unlock()
|
||||
@ -713,7 +773,11 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*pt
|
||||
return empty, nil
|
||||
}
|
||||
|
||||
func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) {
|
||||
func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (_ *taskAPI.StatsResponse, err error) {
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
}()
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@ -733,7 +797,11 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.
|
||||
}
|
||||
|
||||
// Update a running container
|
||||
func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) {
|
||||
func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (_ *ptypes.Empty, err error) {
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
}()
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@ -756,9 +824,13 @@ func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*pt
|
||||
}
|
||||
|
||||
// Wait for a process to exit
|
||||
func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) {
|
||||
func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (_ *taskAPI.WaitResponse, err error) {
|
||||
var ret uint32
|
||||
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
}()
|
||||
|
||||
s.mu.Lock()
|
||||
c, err := s.getContainer(r.ID)
|
||||
s.mu.Unlock()
|
||||
|
Loading…
Reference in New Issue
Block a user