mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-04-30 20:54:26 +00:00
shim: log events for CRI-O
CRI-O start shim process without setting TTRPC_ADDRESS, that the forwarding events goroutine will get errors. For CRI-O runtime, we can log the events to log file. Fixes: #3733 Signed-off-by: bin <bin@hyper.sh>
This commit is contained in:
parent
a671b455a2
commit
f6fc1621f7
88
src/runtime/pkg/containerd-shim-v2/event_forwarder.go
Normal file
88
src/runtime/pkg/containerd-shim-v2/event_forwarder.go
Normal file
@ -0,0 +1,88 @@
|
|||||||
|
// Copyright (c) 2022 Ant Group
|
||||||
|
//
|
||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
//
|
||||||
|
|
||||||
|
package containerdshim
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/containerd/containerd/events"
|
||||||
|
)
|
||||||
|
|
||||||
|
type forwarderType string
|
||||||
|
|
||||||
|
const (
|
||||||
|
forwarderTypeLog forwarderType = "log"
|
||||||
|
forwarderTypeContainerd forwarderType = "containerd"
|
||||||
|
|
||||||
|
// A time span used to wait for publish a containerd event,
|
||||||
|
// once it costs a longer time than timeOut, it will be canceld.
|
||||||
|
timeOut = 5 * time.Second
|
||||||
|
|
||||||
|
// ttrpc address passed from container runtime.
|
||||||
|
// For now containerd will pass the address, and CRI-O will not
|
||||||
|
ttrpcAddressEnv = "TTRPC_ADDRESS"
|
||||||
|
)
|
||||||
|
|
||||||
|
type eventsForwarder interface {
|
||||||
|
forward()
|
||||||
|
forwarderType() forwarderType
|
||||||
|
}
|
||||||
|
|
||||||
|
type logForwarder struct {
|
||||||
|
s *service
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lf *logForwarder) forward() {
|
||||||
|
for e := range lf.s.events {
|
||||||
|
shimLog.WithField("topic", getTopic(e)).Infof("post event: %+v", e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lf *logForwarder) forwarderType() forwarderType {
|
||||||
|
return forwarderTypeLog
|
||||||
|
}
|
||||||
|
|
||||||
|
type containerdForwarder struct {
|
||||||
|
s *service
|
||||||
|
ctx context.Context
|
||||||
|
publisher events.Publisher
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cf *containerdForwarder) forward() {
|
||||||
|
for e := range cf.s.events {
|
||||||
|
ctx, cancel := context.WithTimeout(cf.ctx, timeOut)
|
||||||
|
err := cf.publisher.Publish(ctx, getTopic(e), e)
|
||||||
|
cancel()
|
||||||
|
if err != nil {
|
||||||
|
shimLog.WithError(err).Error("post event")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cf *containerdForwarder) forwarderType() forwarderType {
|
||||||
|
return forwarderTypeContainerd
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *service) newEventsForwarder(ctx context.Context, publisher events.Publisher) eventsForwarder {
|
||||||
|
var forwarder eventsForwarder
|
||||||
|
ttrpcAddress := os.Getenv(ttrpcAddressEnv)
|
||||||
|
if ttrpcAddress == "" {
|
||||||
|
// non containerd will use log forwarder to write events to log
|
||||||
|
forwarder = &logForwarder{
|
||||||
|
s: s,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
forwarder = &containerdForwarder{
|
||||||
|
s: s,
|
||||||
|
ctx: ctx,
|
||||||
|
publisher: publisher,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return forwarder
|
||||||
|
}
|
45
src/runtime/pkg/containerd-shim-v2/event_forwarder_test.go
Normal file
45
src/runtime/pkg/containerd-shim-v2/event_forwarder_test.go
Normal file
@ -0,0 +1,45 @@
|
|||||||
|
// Copyright (c) 2022 Ant Group
|
||||||
|
//
|
||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
//
|
||||||
|
|
||||||
|
package containerdshim
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/containerd/containerd/events"
|
||||||
|
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/vcmock"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNewEventsForwarder(t *testing.T) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
|
||||||
|
sandbox := &vcmock.Sandbox{
|
||||||
|
MockID: testSandboxID,
|
||||||
|
}
|
||||||
|
|
||||||
|
s := &service{
|
||||||
|
id: testSandboxID,
|
||||||
|
sandbox: sandbox,
|
||||||
|
containers: make(map[string]*container),
|
||||||
|
}
|
||||||
|
|
||||||
|
// newEventsForwarder will not call publisher to publish events
|
||||||
|
// so here we can use a nil pointer to test newEventsForwarder
|
||||||
|
var publisher events.Publisher
|
||||||
|
|
||||||
|
// check log forwarder
|
||||||
|
forwarder := s.newEventsForwarder(context.Background(), publisher)
|
||||||
|
assert.Equal(forwarderTypeLog, forwarder.forwarderType())
|
||||||
|
|
||||||
|
// check containerd forwarder
|
||||||
|
os.Setenv(ttrpcAddressEnv, "/foo/bar.sock")
|
||||||
|
defer os.Setenv(ttrpcAddressEnv, "")
|
||||||
|
forwarder = s.newEventsForwarder(context.Background(), publisher)
|
||||||
|
assert.Equal(forwarderTypeContainerd, forwarder.forwarderType())
|
||||||
|
}
|
@ -17,7 +17,6 @@ import (
|
|||||||
eventstypes "github.com/containerd/containerd/api/events"
|
eventstypes "github.com/containerd/containerd/api/events"
|
||||||
"github.com/containerd/containerd/api/types/task"
|
"github.com/containerd/containerd/api/types/task"
|
||||||
"github.com/containerd/containerd/errdefs"
|
"github.com/containerd/containerd/errdefs"
|
||||||
"github.com/containerd/containerd/events"
|
|
||||||
"github.com/containerd/containerd/namespaces"
|
"github.com/containerd/containerd/namespaces"
|
||||||
cdruntime "github.com/containerd/containerd/runtime"
|
cdruntime "github.com/containerd/containerd/runtime"
|
||||||
cdshim "github.com/containerd/containerd/runtime/v2/shim"
|
cdshim "github.com/containerd/containerd/runtime/v2/shim"
|
||||||
@ -51,10 +50,6 @@ const (
|
|||||||
|
|
||||||
chSize = 128
|
chSize = 128
|
||||||
exitCode255 = 255
|
exitCode255 = 255
|
||||||
|
|
||||||
// A time span used to wait for publish a containerd event,
|
|
||||||
// once it costs a longer time than timeOut, it will be canceld.
|
|
||||||
timeOut = 5 * time.Second
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -100,7 +95,8 @@ func New(ctx context.Context, id string, publisher cdshim.Publisher, shutdown fu
|
|||||||
|
|
||||||
go s.processExits()
|
go s.processExits()
|
||||||
|
|
||||||
go s.forward(ctx, publisher)
|
forwarder := s.newEventsForwarder(ctx, publisher)
|
||||||
|
go forwarder.forward()
|
||||||
|
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
@ -256,17 +252,6 @@ func (s *service) StartShim(ctx context.Context, opts cdshim.StartOpts) (_ strin
|
|||||||
return address, nil
|
return address, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *service) forward(ctx context.Context, publisher events.Publisher) {
|
|
||||||
for e := range s.events {
|
|
||||||
ctx, cancel := context.WithTimeout(ctx, timeOut)
|
|
||||||
err := publisher.Publish(ctx, getTopic(e), e)
|
|
||||||
cancel()
|
|
||||||
if err != nil {
|
|
||||||
shimLog.WithError(err).Error("post event")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *service) send(evt interface{}) {
|
func (s *service) send(evt interface{}) {
|
||||||
// for unit test, it will not initialize s.events
|
// for unit test, it will not initialize s.events
|
||||||
if s.events != nil {
|
if s.events != nil {
|
||||||
|
@ -356,6 +356,7 @@ func (c *Container) Logger() *logrus.Entry {
|
|||||||
return virtLog.WithFields(logrus.Fields{
|
return virtLog.WithFields(logrus.Fields{
|
||||||
"subsystem": "container",
|
"subsystem": "container",
|
||||||
"sandbox": c.sandboxID,
|
"sandbox": c.sandboxID,
|
||||||
|
"container": c.id,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user