mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-06-26 23:38:31 +00:00
Merge pull request #904 from cmaf/tracing-shimv2
shimv2: Add tracing to shimv2
This commit is contained in:
commit
b24a2d2e48
@ -25,6 +25,7 @@ import (
|
||||
"github.com/containerd/typeurl"
|
||||
ptypes "github.com/gogo/protobuf/types"
|
||||
"github.com/opencontainers/runtime-spec/specs-go"
|
||||
opentracing "github.com/opentracing/opentracing-go"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"golang.org/x/sys/unix"
|
||||
@ -78,6 +79,21 @@ func New(ctx context.Context, id string, publisher events.Publisher) (cdshim.Shi
|
||||
vci.SetLogger(ctx, shimLog)
|
||||
katautils.SetLogger(ctx, shimLog, shimLog.Logger.Level)
|
||||
|
||||
// load runtime config so that tracing can start if enabled
|
||||
_, runtimeConfig, err := katautils.LoadConfiguration("", false, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// create tracer
|
||||
_, err = katautils.CreateTracer("kata")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// create span
|
||||
span, ctx := trace(ctx, "New")
|
||||
defer span.Finish()
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
s := &service{
|
||||
@ -85,6 +101,7 @@ func New(ctx context.Context, id string, publisher events.Publisher) (cdshim.Shi
|
||||
pid: uint32(os.Getpid()),
|
||||
ctx: ctx,
|
||||
containers: make(map[string]*container),
|
||||
config: &runtimeConfig,
|
||||
events: make(chan interface{}, chSize),
|
||||
ec: make(chan exit, bufferSize),
|
||||
cancel: cancel,
|
||||
@ -167,6 +184,13 @@ func newCommand(ctx context.Context, containerdBinary, id, containerdAddress str
|
||||
// StartShim willl start a kata shimv2 daemon which will implemented the
|
||||
// ShimV2 APIs such as create/start/update etc containers.
|
||||
func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress string) (string, error) {
|
||||
// Stop tracing here since a new tracer will be created the next time New()
|
||||
// is called again after StartShim()
|
||||
defer katautils.StopTracing(s.ctx)
|
||||
|
||||
span, _ := trace(s.ctx, "StartShim")
|
||||
defer span.Finish()
|
||||
|
||||
bundlePath, err := os.Getwd()
|
||||
if err != nil {
|
||||
return "", err
|
||||
@ -278,7 +302,22 @@ func getTopic(e interface{}) string {
|
||||
return cdruntime.TaskUnknownTopic
|
||||
}
|
||||
|
||||
func trace(ctx context.Context, name string) (opentracing.Span, context.Context) {
|
||||
if ctx == nil {
|
||||
logrus.WithField("type", "bug").Error("trace called before context set")
|
||||
ctx = context.Background()
|
||||
}
|
||||
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, name)
|
||||
span.SetTag("source", "runtime")
|
||||
|
||||
return span, ctx
|
||||
}
|
||||
|
||||
func (s *service) Cleanup(ctx context.Context) (_ *taskAPI.DeleteResponse, err error) {
|
||||
span, _ := trace(s.ctx, "Cleanup")
|
||||
defer span.Finish()
|
||||
|
||||
//Since the binary cleanup will return the DeleteResponse from stdout to
|
||||
//containerd, thus we must make sure there is no any outputs in stdout except
|
||||
//the returned response, thus here redirect the log to stderr in case there's
|
||||
@ -334,6 +373,9 @@ func (s *service) Cleanup(ctx context.Context) (_ *taskAPI.DeleteResponse, err e
|
||||
|
||||
// Create a new sandbox or container with the underlying OCI runtime
|
||||
func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) {
|
||||
span, _ := trace(s.ctx, "Create")
|
||||
defer span.Finish()
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
@ -387,6 +429,9 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
|
||||
|
||||
// Start a process
|
||||
func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (_ *taskAPI.StartResponse, err error) {
|
||||
span, _ := trace(s.ctx, "Start")
|
||||
defer span.Finish()
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
@ -435,6 +480,9 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (_ *taskAP
|
||||
|
||||
// Delete the initial process and container
|
||||
func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (_ *taskAPI.DeleteResponse, err error) {
|
||||
span, _ := trace(s.ctx, "Delete")
|
||||
defer span.Finish()
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
@ -484,6 +532,9 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (_ *task
|
||||
|
||||
// Exec an additional process inside the container
|
||||
func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (_ *ptypes.Empty, err error) {
|
||||
span, _ := trace(s.ctx, "Exec")
|
||||
defer span.Finish()
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
rpcDurationsHistogram.WithLabelValues("exec").Observe(float64(time.Since(start).Nanoseconds() / int64(time.Millisecond)))
|
||||
@ -519,6 +570,9 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (_ *p
|
||||
|
||||
// ResizePty of a process
|
||||
func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (_ *ptypes.Empty, err error) {
|
||||
span, _ := trace(s.ctx, "ResizePty")
|
||||
defer span.Finish()
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
@ -555,6 +609,9 @@ func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (_
|
||||
|
||||
// State returns runtime state information for a process
|
||||
func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (_ *taskAPI.StateResponse, err error) {
|
||||
span, _ := trace(s.ctx, "State")
|
||||
defer span.Finish()
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
@ -600,11 +657,13 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (_ *taskAP
|
||||
Terminal: execs.tty.terminal,
|
||||
ExitStatus: uint32(execs.exitCode),
|
||||
}, nil
|
||||
|
||||
}
|
||||
|
||||
// Pause the container
|
||||
func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (_ *ptypes.Empty, err error) {
|
||||
span, _ := trace(s.ctx, "Pause")
|
||||
defer span.Finish()
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
@ -641,6 +700,9 @@ func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (_ *ptypes
|
||||
|
||||
// Resume the container
|
||||
func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (_ *ptypes.Empty, err error) {
|
||||
span, _ := trace(s.ctx, "Resume")
|
||||
defer span.Finish()
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
@ -675,6 +737,9 @@ func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (_ *ptyp
|
||||
|
||||
// Kill a process with the provided signal
|
||||
func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (_ *ptypes.Empty, err error) {
|
||||
span, _ := trace(s.ctx, "Kill")
|
||||
defer span.Finish()
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
@ -733,6 +798,9 @@ func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (_ *ptypes.E
|
||||
// Since for kata, it cannot get the process's pid from VM,
|
||||
// thus only return the Shim's pid directly.
|
||||
func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (_ *taskAPI.PidsResponse, err error) {
|
||||
span, _ := trace(s.ctx, "Pids")
|
||||
defer span.Finish()
|
||||
|
||||
var processes []*task.ProcessInfo
|
||||
|
||||
start := time.Now()
|
||||
@ -753,6 +821,9 @@ func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (_ *taskAPI.
|
||||
|
||||
// CloseIO of a process
|
||||
func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (_ *ptypes.Empty, err error) {
|
||||
span, _ := trace(s.ctx, "CloseIO")
|
||||
defer span.Finish()
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
@ -791,6 +862,9 @@ func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (_ *pt
|
||||
|
||||
// Checkpoint the container
|
||||
func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (_ *ptypes.Empty, err error) {
|
||||
span, _ := trace(s.ctx, "Checkpoint")
|
||||
defer span.Finish()
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
@ -802,6 +876,9 @@ func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskReque
|
||||
|
||||
// Connect returns shim information such as the shim's pid
|
||||
func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (_ *taskAPI.ConnectResponse, err error) {
|
||||
span, _ := trace(s.ctx, "Connect")
|
||||
defer span.Finish()
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
@ -819,6 +896,8 @@ func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (_ *ta
|
||||
}
|
||||
|
||||
func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (_ *ptypes.Empty, err error) {
|
||||
span, _ := trace(s.ctx, "Shutdown")
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
@ -832,6 +911,9 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (_ *
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
span.Finish()
|
||||
katautils.StopTracing(s.ctx)
|
||||
|
||||
s.cancel()
|
||||
|
||||
os.Exit(0)
|
||||
@ -842,6 +924,9 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (_ *
|
||||
}
|
||||
|
||||
func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (_ *taskAPI.StatsResponse, err error) {
|
||||
span, _ := trace(s.ctx, "Stats")
|
||||
defer span.Finish()
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
@ -868,6 +953,9 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (_ *taskAP
|
||||
|
||||
// Update a running container
|
||||
func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (_ *ptypes.Empty, err error) {
|
||||
span, _ := trace(s.ctx, "Update")
|
||||
defer span.Finish()
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
@ -897,6 +985,9 @@ func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (_ *
|
||||
|
||||
// Wait for a process to exit
|
||||
func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (_ *taskAPI.WaitResponse, err error) {
|
||||
span, _ := trace(s.ctx, "Wait")
|
||||
defer span.Finish()
|
||||
|
||||
var ret uint32
|
||||
|
||||
start := time.Now()
|
||||
|
Loading…
Reference in New Issue
Block a user