diff --git a/src/runtime/pkg/containerd-shim-v2/event_forwarder.go b/src/runtime/pkg/containerd-shim-v2/event_forwarder.go new file mode 100644 index 0000000000..a77645a1fd --- /dev/null +++ b/src/runtime/pkg/containerd-shim-v2/event_forwarder.go @@ -0,0 +1,88 @@ +// Copyright (c) 2022 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +package containerdshim + +import ( + "context" + "os" + "time" + + "github.com/containerd/containerd/events" +) + +type forwarderType string + +const ( + forwarderTypeLog forwarderType = "log" + forwarderTypeContainerd forwarderType = "containerd" + + // A time span used to wait for publish a containerd event, + // once it costs a longer time than timeOut, it will be canceld. + timeOut = 5 * time.Second + + // ttrpc address passed from container runtime. + // For now containerd will pass the address, and CRI-O will not + ttrpcAddressEnv = "TTRPC_ADDRESS" +) + +type eventsForwarder interface { + forward() + forwarderType() forwarderType +} + +type logForwarder struct { + s *service +} + +func (lf *logForwarder) forward() { + for e := range lf.s.events { + shimLog.WithField("topic", getTopic(e)).Infof("post event: %+v", e) + } +} + +func (lf *logForwarder) forwarderType() forwarderType { + return forwarderTypeLog +} + +type containerdForwarder struct { + s *service + ctx context.Context + publisher events.Publisher +} + +func (cf *containerdForwarder) forward() { + for e := range cf.s.events { + ctx, cancel := context.WithTimeout(cf.ctx, timeOut) + err := cf.publisher.Publish(ctx, getTopic(e), e) + cancel() + if err != nil { + shimLog.WithError(err).Error("post event") + } + } +} + +func (cf *containerdForwarder) forwarderType() forwarderType { + return forwarderTypeContainerd +} + +func (s *service) newEventsForwarder(ctx context.Context, publisher events.Publisher) eventsForwarder { + var forwarder eventsForwarder + ttrpcAddress := os.Getenv(ttrpcAddressEnv) + if ttrpcAddress == "" { + // non containerd will use log forwarder to write events to log + forwarder = &logForwarder{ + s: s, + } + } else { + forwarder = &containerdForwarder{ + s: s, + ctx: ctx, + publisher: publisher, + } + } + + return forwarder +} diff --git a/src/runtime/pkg/containerd-shim-v2/event_forwarder_test.go b/src/runtime/pkg/containerd-shim-v2/event_forwarder_test.go new file mode 100644 index 0000000000..979ce5f1e0 --- /dev/null +++ b/src/runtime/pkg/containerd-shim-v2/event_forwarder_test.go @@ -0,0 +1,45 @@ +// Copyright (c) 2022 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +package containerdshim + +import ( + "context" + "os" + "testing" + + "github.com/containerd/containerd/events" + "github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/vcmock" + + "github.com/stretchr/testify/assert" +) + +func TestNewEventsForwarder(t *testing.T) { + assert := assert.New(t) + + sandbox := &vcmock.Sandbox{ + MockID: testSandboxID, + } + + s := &service{ + id: testSandboxID, + sandbox: sandbox, + containers: make(map[string]*container), + } + + // newEventsForwarder will not call publisher to publish events + // so here we can use a nil pointer to test newEventsForwarder + var publisher events.Publisher + + // check log forwarder + forwarder := s.newEventsForwarder(context.Background(), publisher) + assert.Equal(forwarderTypeLog, forwarder.forwarderType()) + + // check containerd forwarder + os.Setenv(ttrpcAddressEnv, "/foo/bar.sock") + defer os.Setenv(ttrpcAddressEnv, "") + forwarder = s.newEventsForwarder(context.Background(), publisher) + assert.Equal(forwarderTypeContainerd, forwarder.forwarderType()) +} diff --git a/src/runtime/pkg/containerd-shim-v2/service.go b/src/runtime/pkg/containerd-shim-v2/service.go index 0cca3ebf8b..8e20ae82fb 100644 --- a/src/runtime/pkg/containerd-shim-v2/service.go +++ b/src/runtime/pkg/containerd-shim-v2/service.go @@ -17,7 +17,6 @@ import ( eventstypes "github.com/containerd/containerd/api/events" "github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/events" "github.com/containerd/containerd/namespaces" cdruntime "github.com/containerd/containerd/runtime" cdshim "github.com/containerd/containerd/runtime/v2/shim" @@ -51,10 +50,6 @@ const ( chSize = 128 exitCode255 = 255 - - // A time span used to wait for publish a containerd event, - // once it costs a longer time than timeOut, it will be canceld. - timeOut = 5 * time.Second ) var ( @@ -100,7 +95,8 @@ func New(ctx context.Context, id string, publisher cdshim.Publisher, shutdown fu go s.processExits() - go s.forward(ctx, publisher) + forwarder := s.newEventsForwarder(ctx, publisher) + go forwarder.forward() return s, nil } @@ -256,17 +252,6 @@ func (s *service) StartShim(ctx context.Context, opts cdshim.StartOpts) (_ strin return address, nil } -func (s *service) forward(ctx context.Context, publisher events.Publisher) { - for e := range s.events { - ctx, cancel := context.WithTimeout(ctx, timeOut) - err := publisher.Publish(ctx, getTopic(e), e) - cancel() - if err != nil { - shimLog.WithError(err).Error("post event") - } - } -} - func (s *service) send(evt interface{}) { // for unit test, it will not initialize s.events if s.events != nil { diff --git a/src/runtime/virtcontainers/container.go b/src/runtime/virtcontainers/container.go index 13ec548553..ee0a380791 100644 --- a/src/runtime/virtcontainers/container.go +++ b/src/runtime/virtcontainers/container.go @@ -356,6 +356,7 @@ func (c *Container) Logger() *logrus.Entry { return virtLog.WithFields(logrus.Fields{ "subsystem": "container", "sandbox": c.sandboxID, + "container": c.id, }) }