mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-04-30 04:34:27 +00:00
shim: log events for CRI-O
CRI-O start shim process without setting TTRPC_ADDRESS, that the forwarding events goroutine will get errors. For CRI-O runtime, we can log the events to log file. Fixes: #3733 Signed-off-by: bin <bin@hyper.sh>
This commit is contained in:
parent
a671b455a2
commit
f6fc1621f7
88
src/runtime/pkg/containerd-shim-v2/event_forwarder.go
Normal file
88
src/runtime/pkg/containerd-shim-v2/event_forwarder.go
Normal file
@ -0,0 +1,88 @@
|
||||
// Copyright (c) 2022 Ant Group
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
package containerdshim
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/events"
|
||||
)
|
||||
|
||||
type forwarderType string
|
||||
|
||||
const (
|
||||
forwarderTypeLog forwarderType = "log"
|
||||
forwarderTypeContainerd forwarderType = "containerd"
|
||||
|
||||
// A time span used to wait for publish a containerd event,
|
||||
// once it costs a longer time than timeOut, it will be canceld.
|
||||
timeOut = 5 * time.Second
|
||||
|
||||
// ttrpc address passed from container runtime.
|
||||
// For now containerd will pass the address, and CRI-O will not
|
||||
ttrpcAddressEnv = "TTRPC_ADDRESS"
|
||||
)
|
||||
|
||||
type eventsForwarder interface {
|
||||
forward()
|
||||
forwarderType() forwarderType
|
||||
}
|
||||
|
||||
type logForwarder struct {
|
||||
s *service
|
||||
}
|
||||
|
||||
func (lf *logForwarder) forward() {
|
||||
for e := range lf.s.events {
|
||||
shimLog.WithField("topic", getTopic(e)).Infof("post event: %+v", e)
|
||||
}
|
||||
}
|
||||
|
||||
func (lf *logForwarder) forwarderType() forwarderType {
|
||||
return forwarderTypeLog
|
||||
}
|
||||
|
||||
type containerdForwarder struct {
|
||||
s *service
|
||||
ctx context.Context
|
||||
publisher events.Publisher
|
||||
}
|
||||
|
||||
func (cf *containerdForwarder) forward() {
|
||||
for e := range cf.s.events {
|
||||
ctx, cancel := context.WithTimeout(cf.ctx, timeOut)
|
||||
err := cf.publisher.Publish(ctx, getTopic(e), e)
|
||||
cancel()
|
||||
if err != nil {
|
||||
shimLog.WithError(err).Error("post event")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (cf *containerdForwarder) forwarderType() forwarderType {
|
||||
return forwarderTypeContainerd
|
||||
}
|
||||
|
||||
func (s *service) newEventsForwarder(ctx context.Context, publisher events.Publisher) eventsForwarder {
|
||||
var forwarder eventsForwarder
|
||||
ttrpcAddress := os.Getenv(ttrpcAddressEnv)
|
||||
if ttrpcAddress == "" {
|
||||
// non containerd will use log forwarder to write events to log
|
||||
forwarder = &logForwarder{
|
||||
s: s,
|
||||
}
|
||||
} else {
|
||||
forwarder = &containerdForwarder{
|
||||
s: s,
|
||||
ctx: ctx,
|
||||
publisher: publisher,
|
||||
}
|
||||
}
|
||||
|
||||
return forwarder
|
||||
}
|
45
src/runtime/pkg/containerd-shim-v2/event_forwarder_test.go
Normal file
45
src/runtime/pkg/containerd-shim-v2/event_forwarder_test.go
Normal file
@ -0,0 +1,45 @@
|
||||
// Copyright (c) 2022 Ant Group
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
package containerdshim
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/containerd/containerd/events"
|
||||
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/vcmock"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestNewEventsForwarder(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
sandbox := &vcmock.Sandbox{
|
||||
MockID: testSandboxID,
|
||||
}
|
||||
|
||||
s := &service{
|
||||
id: testSandboxID,
|
||||
sandbox: sandbox,
|
||||
containers: make(map[string]*container),
|
||||
}
|
||||
|
||||
// newEventsForwarder will not call publisher to publish events
|
||||
// so here we can use a nil pointer to test newEventsForwarder
|
||||
var publisher events.Publisher
|
||||
|
||||
// check log forwarder
|
||||
forwarder := s.newEventsForwarder(context.Background(), publisher)
|
||||
assert.Equal(forwarderTypeLog, forwarder.forwarderType())
|
||||
|
||||
// check containerd forwarder
|
||||
os.Setenv(ttrpcAddressEnv, "/foo/bar.sock")
|
||||
defer os.Setenv(ttrpcAddressEnv, "")
|
||||
forwarder = s.newEventsForwarder(context.Background(), publisher)
|
||||
assert.Equal(forwarderTypeContainerd, forwarder.forwarderType())
|
||||
}
|
@ -17,7 +17,6 @@ import (
|
||||
eventstypes "github.com/containerd/containerd/api/events"
|
||||
"github.com/containerd/containerd/api/types/task"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/containerd/containerd/events"
|
||||
"github.com/containerd/containerd/namespaces"
|
||||
cdruntime "github.com/containerd/containerd/runtime"
|
||||
cdshim "github.com/containerd/containerd/runtime/v2/shim"
|
||||
@ -51,10 +50,6 @@ const (
|
||||
|
||||
chSize = 128
|
||||
exitCode255 = 255
|
||||
|
||||
// A time span used to wait for publish a containerd event,
|
||||
// once it costs a longer time than timeOut, it will be canceld.
|
||||
timeOut = 5 * time.Second
|
||||
)
|
||||
|
||||
var (
|
||||
@ -100,7 +95,8 @@ func New(ctx context.Context, id string, publisher cdshim.Publisher, shutdown fu
|
||||
|
||||
go s.processExits()
|
||||
|
||||
go s.forward(ctx, publisher)
|
||||
forwarder := s.newEventsForwarder(ctx, publisher)
|
||||
go forwarder.forward()
|
||||
|
||||
return s, nil
|
||||
}
|
||||
@ -256,17 +252,6 @@ func (s *service) StartShim(ctx context.Context, opts cdshim.StartOpts) (_ strin
|
||||
return address, nil
|
||||
}
|
||||
|
||||
func (s *service) forward(ctx context.Context, publisher events.Publisher) {
|
||||
for e := range s.events {
|
||||
ctx, cancel := context.WithTimeout(ctx, timeOut)
|
||||
err := publisher.Publish(ctx, getTopic(e), e)
|
||||
cancel()
|
||||
if err != nil {
|
||||
shimLog.WithError(err).Error("post event")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *service) send(evt interface{}) {
|
||||
// for unit test, it will not initialize s.events
|
||||
if s.events != nil {
|
||||
|
@ -356,6 +356,7 @@ func (c *Container) Logger() *logrus.Entry {
|
||||
return virtLog.WithFields(logrus.Fields{
|
||||
"subsystem": "container",
|
||||
"sandbox": c.sandboxID,
|
||||
"container": c.id,
|
||||
})
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user