diff --git a/src/runtime/containerd-shim-v2/service.go b/src/runtime/containerd-shim-v2/service.go index d6b9f9773a..54d0086129 100644 --- a/src/runtime/containerd-shim-v2/service.go +++ b/src/runtime/containerd-shim-v2/service.go @@ -25,6 +25,7 @@ import ( "github.com/containerd/typeurl" ptypes "github.com/gogo/protobuf/types" "github.com/opencontainers/runtime-spec/specs-go" + opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/sirupsen/logrus" "golang.org/x/sys/unix" @@ -78,6 +79,21 @@ func New(ctx context.Context, id string, publisher events.Publisher) (cdshim.Shi vci.SetLogger(ctx, shimLog) katautils.SetLogger(ctx, shimLog, shimLog.Logger.Level) + // load runtime config so that tracing can start if enabled + _, runtimeConfig, err := katautils.LoadConfiguration("", false, true) + if err != nil { + return nil, err + } + + // create tracer + _, err = katautils.CreateTracer("kata") + if err != nil { + return nil, err + } + // create span + span, ctx := trace(ctx, "New") + defer span.Finish() + ctx, cancel := context.WithCancel(ctx) s := &service{ @@ -85,6 +101,7 @@ func New(ctx context.Context, id string, publisher events.Publisher) (cdshim.Shi pid: uint32(os.Getpid()), ctx: ctx, containers: make(map[string]*container), + config: &runtimeConfig, events: make(chan interface{}, chSize), ec: make(chan exit, bufferSize), cancel: cancel, @@ -167,6 +184,13 @@ func newCommand(ctx context.Context, containerdBinary, id, containerdAddress str // StartShim willl start a kata shimv2 daemon which will implemented the // ShimV2 APIs such as create/start/update etc containers. func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress string) (string, error) { + // Stop tracing here since a new tracer will be created the next time New() + // is called again after StartShim() + defer katautils.StopTracing(s.ctx) + + span, _ := trace(s.ctx, "StartShim") + defer span.Finish() + bundlePath, err := os.Getwd() if err != nil { return "", err @@ -278,7 +302,22 @@ func getTopic(e interface{}) string { return cdruntime.TaskUnknownTopic } +func trace(ctx context.Context, name string) (opentracing.Span, context.Context) { + if ctx == nil { + logrus.WithField("type", "bug").Error("trace called before context set") + ctx = context.Background() + } + + span, ctx := opentracing.StartSpanFromContext(ctx, name) + span.SetTag("source", "runtime") + + return span, ctx +} + func (s *service) Cleanup(ctx context.Context) (_ *taskAPI.DeleteResponse, err error) { + span, _ := trace(s.ctx, "Cleanup") + defer span.Finish() + //Since the binary cleanup will return the DeleteResponse from stdout to //containerd, thus we must make sure there is no any outputs in stdout except //the returned response, thus here redirect the log to stderr in case there's @@ -334,6 +373,9 @@ func (s *service) Cleanup(ctx context.Context) (_ *taskAPI.DeleteResponse, err e // Create a new sandbox or container with the underlying OCI runtime func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) { + span, _ := trace(s.ctx, "Create") + defer span.Finish() + start := time.Now() defer func() { err = toGRPC(err) @@ -387,6 +429,9 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ * // Start a process func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (_ *taskAPI.StartResponse, err error) { + span, _ := trace(s.ctx, "Start") + defer span.Finish() + start := time.Now() defer func() { err = toGRPC(err) @@ -435,6 +480,9 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (_ *taskAP // Delete the initial process and container func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (_ *taskAPI.DeleteResponse, err error) { + span, _ := trace(s.ctx, "Delete") + defer span.Finish() + start := time.Now() defer func() { err = toGRPC(err) @@ -484,6 +532,9 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (_ *task // Exec an additional process inside the container func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (_ *ptypes.Empty, err error) { + span, _ := trace(s.ctx, "Exec") + defer span.Finish() + start := time.Now() defer func() { rpcDurationsHistogram.WithLabelValues("exec").Observe(float64(time.Since(start).Nanoseconds() / int64(time.Millisecond))) @@ -519,6 +570,9 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (_ *p // ResizePty of a process func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (_ *ptypes.Empty, err error) { + span, _ := trace(s.ctx, "ResizePty") + defer span.Finish() + start := time.Now() defer func() { err = toGRPC(err) @@ -555,6 +609,9 @@ func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (_ // State returns runtime state information for a process func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (_ *taskAPI.StateResponse, err error) { + span, _ := trace(s.ctx, "State") + defer span.Finish() + start := time.Now() defer func() { err = toGRPC(err) @@ -600,11 +657,13 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (_ *taskAP Terminal: execs.tty.terminal, ExitStatus: uint32(execs.exitCode), }, nil - } // Pause the container func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (_ *ptypes.Empty, err error) { + span, _ := trace(s.ctx, "Pause") + defer span.Finish() + start := time.Now() defer func() { err = toGRPC(err) @@ -641,6 +700,9 @@ func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (_ *ptypes // Resume the container func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (_ *ptypes.Empty, err error) { + span, _ := trace(s.ctx, "Resume") + defer span.Finish() + start := time.Now() defer func() { err = toGRPC(err) @@ -675,6 +737,9 @@ func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (_ *ptyp // Kill a process with the provided signal func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (_ *ptypes.Empty, err error) { + span, _ := trace(s.ctx, "Kill") + defer span.Finish() + start := time.Now() defer func() { err = toGRPC(err) @@ -733,6 +798,9 @@ func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (_ *ptypes.E // Since for kata, it cannot get the process's pid from VM, // thus only return the Shim's pid directly. func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (_ *taskAPI.PidsResponse, err error) { + span, _ := trace(s.ctx, "Pids") + defer span.Finish() + var processes []*task.ProcessInfo start := time.Now() @@ -753,6 +821,9 @@ func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (_ *taskAPI. // CloseIO of a process func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (_ *ptypes.Empty, err error) { + span, _ := trace(s.ctx, "CloseIO") + defer span.Finish() + start := time.Now() defer func() { err = toGRPC(err) @@ -791,6 +862,9 @@ func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (_ *pt // Checkpoint the container func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (_ *ptypes.Empty, err error) { + span, _ := trace(s.ctx, "Checkpoint") + defer span.Finish() + start := time.Now() defer func() { err = toGRPC(err) @@ -802,6 +876,9 @@ func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskReque // Connect returns shim information such as the shim's pid func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (_ *taskAPI.ConnectResponse, err error) { + span, _ := trace(s.ctx, "Connect") + defer span.Finish() + start := time.Now() defer func() { err = toGRPC(err) @@ -819,6 +896,8 @@ func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (_ *ta } func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (_ *ptypes.Empty, err error) { + span, _ := trace(s.ctx, "Shutdown") + start := time.Now() defer func() { err = toGRPC(err) @@ -832,6 +911,9 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (_ * } s.mu.Unlock() + span.Finish() + katautils.StopTracing(s.ctx) + s.cancel() os.Exit(0) @@ -842,6 +924,9 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (_ * } func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (_ *taskAPI.StatsResponse, err error) { + span, _ := trace(s.ctx, "Stats") + defer span.Finish() + start := time.Now() defer func() { err = toGRPC(err) @@ -868,6 +953,9 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (_ *taskAP // Update a running container func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (_ *ptypes.Empty, err error) { + span, _ := trace(s.ctx, "Update") + defer span.Finish() + start := time.Now() defer func() { err = toGRPC(err) @@ -897,6 +985,9 @@ func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (_ * // Wait for a process to exit func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (_ *taskAPI.WaitResponse, err error) { + span, _ := trace(s.ctx, "Wait") + defer span.Finish() + var ret uint32 start := time.Now()