shim: log events for CRI-O

CRI-O start shim process without setting TTRPC_ADDRESS,
that the forwarding events goroutine will get errors.

For CRI-O runtime, we can log the events to log file.

Fixes: #3733

Signed-off-by: bin <bin@hyper.sh>
This commit is contained in:
bin 2022-02-21 17:25:00 +08:00
parent a671b455a2
commit f6fc1621f7
4 changed files with 136 additions and 17 deletions

View File

@ -0,0 +1,88 @@
// Copyright (c) 2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
package containerdshim
import (
"context"
"os"
"time"
"github.com/containerd/containerd/events"
)
type forwarderType string
const (
forwarderTypeLog forwarderType = "log"
forwarderTypeContainerd forwarderType = "containerd"
// A time span used to wait for publish a containerd event,
// once it costs a longer time than timeOut, it will be canceld.
timeOut = 5 * time.Second
// ttrpc address passed from container runtime.
// For now containerd will pass the address, and CRI-O will not
ttrpcAddressEnv = "TTRPC_ADDRESS"
)
type eventsForwarder interface {
forward()
forwarderType() forwarderType
}
type logForwarder struct {
s *service
}
func (lf *logForwarder) forward() {
for e := range lf.s.events {
shimLog.WithField("topic", getTopic(e)).Infof("post event: %+v", e)
}
}
func (lf *logForwarder) forwarderType() forwarderType {
return forwarderTypeLog
}
type containerdForwarder struct {
s *service
ctx context.Context
publisher events.Publisher
}
func (cf *containerdForwarder) forward() {
for e := range cf.s.events {
ctx, cancel := context.WithTimeout(cf.ctx, timeOut)
err := cf.publisher.Publish(ctx, getTopic(e), e)
cancel()
if err != nil {
shimLog.WithError(err).Error("post event")
}
}
}
func (cf *containerdForwarder) forwarderType() forwarderType {
return forwarderTypeContainerd
}
func (s *service) newEventsForwarder(ctx context.Context, publisher events.Publisher) eventsForwarder {
var forwarder eventsForwarder
ttrpcAddress := os.Getenv(ttrpcAddressEnv)
if ttrpcAddress == "" {
// non containerd will use log forwarder to write events to log
forwarder = &logForwarder{
s: s,
}
} else {
forwarder = &containerdForwarder{
s: s,
ctx: ctx,
publisher: publisher,
}
}
return forwarder
}

View File

@ -0,0 +1,45 @@
// Copyright (c) 2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
package containerdshim
import (
"context"
"os"
"testing"
"github.com/containerd/containerd/events"
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/vcmock"
"github.com/stretchr/testify/assert"
)
func TestNewEventsForwarder(t *testing.T) {
assert := assert.New(t)
sandbox := &vcmock.Sandbox{
MockID: testSandboxID,
}
s := &service{
id: testSandboxID,
sandbox: sandbox,
containers: make(map[string]*container),
}
// newEventsForwarder will not call publisher to publish events
// so here we can use a nil pointer to test newEventsForwarder
var publisher events.Publisher
// check log forwarder
forwarder := s.newEventsForwarder(context.Background(), publisher)
assert.Equal(forwarderTypeLog, forwarder.forwarderType())
// check containerd forwarder
os.Setenv(ttrpcAddressEnv, "/foo/bar.sock")
defer os.Setenv(ttrpcAddressEnv, "")
forwarder = s.newEventsForwarder(context.Background(), publisher)
assert.Equal(forwarderTypeContainerd, forwarder.forwarderType())
}

View File

@ -17,7 +17,6 @@ import (
eventstypes "github.com/containerd/containerd/api/events" eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/namespaces"
cdruntime "github.com/containerd/containerd/runtime" cdruntime "github.com/containerd/containerd/runtime"
cdshim "github.com/containerd/containerd/runtime/v2/shim" cdshim "github.com/containerd/containerd/runtime/v2/shim"
@ -51,10 +50,6 @@ const (
chSize = 128 chSize = 128
exitCode255 = 255 exitCode255 = 255
// A time span used to wait for publish a containerd event,
// once it costs a longer time than timeOut, it will be canceld.
timeOut = 5 * time.Second
) )
var ( var (
@ -100,7 +95,8 @@ func New(ctx context.Context, id string, publisher cdshim.Publisher, shutdown fu
go s.processExits() go s.processExits()
go s.forward(ctx, publisher) forwarder := s.newEventsForwarder(ctx, publisher)
go forwarder.forward()
return s, nil return s, nil
} }
@ -256,17 +252,6 @@ func (s *service) StartShim(ctx context.Context, opts cdshim.StartOpts) (_ strin
return address, nil return address, nil
} }
func (s *service) forward(ctx context.Context, publisher events.Publisher) {
for e := range s.events {
ctx, cancel := context.WithTimeout(ctx, timeOut)
err := publisher.Publish(ctx, getTopic(e), e)
cancel()
if err != nil {
shimLog.WithError(err).Error("post event")
}
}
}
func (s *service) send(evt interface{}) { func (s *service) send(evt interface{}) {
// for unit test, it will not initialize s.events // for unit test, it will not initialize s.events
if s.events != nil { if s.events != nil {

View File

@ -356,6 +356,7 @@ func (c *Container) Logger() *logrus.Entry {
return virtLog.WithFields(logrus.Fields{ return virtLog.WithFields(logrus.Fields{
"subsystem": "container", "subsystem": "container",
"sandbox": c.sandboxID, "sandbox": c.sandboxID,
"container": c.id,
}) })
} }