mirror of
				https://github.com/kata-containers/kata-containers.git
				synced 2025-10-31 09:26:52 +00:00 
			
		
		
		
	shim: log events for CRI-O
CRI-O start shim process without setting TTRPC_ADDRESS, that the forwarding events goroutine will get errors. For CRI-O runtime, we can log the events to log file. Fixes: #3733 Signed-off-by: bin <bin@hyper.sh>
This commit is contained in:
		
							
								
								
									
										88
									
								
								src/runtime/pkg/containerd-shim-v2/event_forwarder.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										88
									
								
								src/runtime/pkg/containerd-shim-v2/event_forwarder.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,88 @@ | |||||||
|  | // Copyright (c) 2022 Ant Group | ||||||
|  | // | ||||||
|  | // SPDX-License-Identifier: Apache-2.0 | ||||||
|  | // | ||||||
|  |  | ||||||
|  | package containerdshim | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"os" | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
|  | 	"github.com/containerd/containerd/events" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | type forwarderType string | ||||||
|  |  | ||||||
|  | const ( | ||||||
|  | 	forwarderTypeLog        forwarderType = "log" | ||||||
|  | 	forwarderTypeContainerd forwarderType = "containerd" | ||||||
|  |  | ||||||
|  | 	// A time span used to wait for publish a containerd event, | ||||||
|  | 	// once it costs a longer time than timeOut, it will be canceld. | ||||||
|  | 	timeOut = 5 * time.Second | ||||||
|  |  | ||||||
|  | 	// ttrpc address passed from container runtime. | ||||||
|  | 	// For now containerd will pass the address, and CRI-O will not | ||||||
|  | 	ttrpcAddressEnv = "TTRPC_ADDRESS" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | type eventsForwarder interface { | ||||||
|  | 	forward() | ||||||
|  | 	forwarderType() forwarderType | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type logForwarder struct { | ||||||
|  | 	s *service | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (lf *logForwarder) forward() { | ||||||
|  | 	for e := range lf.s.events { | ||||||
|  | 		shimLog.WithField("topic", getTopic(e)).Infof("post event: %+v", e) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (lf *logForwarder) forwarderType() forwarderType { | ||||||
|  | 	return forwarderTypeLog | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type containerdForwarder struct { | ||||||
|  | 	s         *service | ||||||
|  | 	ctx       context.Context | ||||||
|  | 	publisher events.Publisher | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (cf *containerdForwarder) forward() { | ||||||
|  | 	for e := range cf.s.events { | ||||||
|  | 		ctx, cancel := context.WithTimeout(cf.ctx, timeOut) | ||||||
|  | 		err := cf.publisher.Publish(ctx, getTopic(e), e) | ||||||
|  | 		cancel() | ||||||
|  | 		if err != nil { | ||||||
|  | 			shimLog.WithError(err).Error("post event") | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (cf *containerdForwarder) forwarderType() forwarderType { | ||||||
|  | 	return forwarderTypeContainerd | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *service) newEventsForwarder(ctx context.Context, publisher events.Publisher) eventsForwarder { | ||||||
|  | 	var forwarder eventsForwarder | ||||||
|  | 	ttrpcAddress := os.Getenv(ttrpcAddressEnv) | ||||||
|  | 	if ttrpcAddress == "" { | ||||||
|  | 		// non containerd will use log forwarder to write events to log | ||||||
|  | 		forwarder = &logForwarder{ | ||||||
|  | 			s: s, | ||||||
|  | 		} | ||||||
|  | 	} else { | ||||||
|  | 		forwarder = &containerdForwarder{ | ||||||
|  | 			s:         s, | ||||||
|  | 			ctx:       ctx, | ||||||
|  | 			publisher: publisher, | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return forwarder | ||||||
|  | } | ||||||
							
								
								
									
										45
									
								
								src/runtime/pkg/containerd-shim-v2/event_forwarder_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										45
									
								
								src/runtime/pkg/containerd-shim-v2/event_forwarder_test.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,45 @@ | |||||||
|  | // Copyright (c) 2022 Ant Group | ||||||
|  | // | ||||||
|  | // SPDX-License-Identifier: Apache-2.0 | ||||||
|  | // | ||||||
|  |  | ||||||
|  | package containerdshim | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"os" | ||||||
|  | 	"testing" | ||||||
|  |  | ||||||
|  | 	"github.com/containerd/containerd/events" | ||||||
|  | 	"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/vcmock" | ||||||
|  |  | ||||||
|  | 	"github.com/stretchr/testify/assert" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | func TestNewEventsForwarder(t *testing.T) { | ||||||
|  | 	assert := assert.New(t) | ||||||
|  |  | ||||||
|  | 	sandbox := &vcmock.Sandbox{ | ||||||
|  | 		MockID: testSandboxID, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	s := &service{ | ||||||
|  | 		id:         testSandboxID, | ||||||
|  | 		sandbox:    sandbox, | ||||||
|  | 		containers: make(map[string]*container), | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// newEventsForwarder will not call publisher to publish events | ||||||
|  | 	// so here we can use a nil pointer to test newEventsForwarder | ||||||
|  | 	var publisher events.Publisher | ||||||
|  |  | ||||||
|  | 	// check log forwarder | ||||||
|  | 	forwarder := s.newEventsForwarder(context.Background(), publisher) | ||||||
|  | 	assert.Equal(forwarderTypeLog, forwarder.forwarderType()) | ||||||
|  |  | ||||||
|  | 	// check containerd forwarder | ||||||
|  | 	os.Setenv(ttrpcAddressEnv, "/foo/bar.sock") | ||||||
|  | 	defer os.Setenv(ttrpcAddressEnv, "") | ||||||
|  | 	forwarder = s.newEventsForwarder(context.Background(), publisher) | ||||||
|  | 	assert.Equal(forwarderTypeContainerd, forwarder.forwarderType()) | ||||||
|  | } | ||||||
| @@ -17,7 +17,6 @@ import ( | |||||||
| 	eventstypes "github.com/containerd/containerd/api/events" | 	eventstypes "github.com/containerd/containerd/api/events" | ||||||
| 	"github.com/containerd/containerd/api/types/task" | 	"github.com/containerd/containerd/api/types/task" | ||||||
| 	"github.com/containerd/containerd/errdefs" | 	"github.com/containerd/containerd/errdefs" | ||||||
| 	"github.com/containerd/containerd/events" |  | ||||||
| 	"github.com/containerd/containerd/namespaces" | 	"github.com/containerd/containerd/namespaces" | ||||||
| 	cdruntime "github.com/containerd/containerd/runtime" | 	cdruntime "github.com/containerd/containerd/runtime" | ||||||
| 	cdshim "github.com/containerd/containerd/runtime/v2/shim" | 	cdshim "github.com/containerd/containerd/runtime/v2/shim" | ||||||
| @@ -51,10 +50,6 @@ const ( | |||||||
|  |  | ||||||
| 	chSize      = 128 | 	chSize      = 128 | ||||||
| 	exitCode255 = 255 | 	exitCode255 = 255 | ||||||
|  |  | ||||||
| 	// A time span used to wait for publish a containerd event, |  | ||||||
| 	// once it costs a longer time than timeOut, it will be canceld. |  | ||||||
| 	timeOut = 5 * time.Second |  | ||||||
| ) | ) | ||||||
|  |  | ||||||
| var ( | var ( | ||||||
| @@ -100,7 +95,8 @@ func New(ctx context.Context, id string, publisher cdshim.Publisher, shutdown fu | |||||||
|  |  | ||||||
| 	go s.processExits() | 	go s.processExits() | ||||||
|  |  | ||||||
| 	go s.forward(ctx, publisher) | 	forwarder := s.newEventsForwarder(ctx, publisher) | ||||||
|  | 	go forwarder.forward() | ||||||
|  |  | ||||||
| 	return s, nil | 	return s, nil | ||||||
| } | } | ||||||
| @@ -256,17 +252,6 @@ func (s *service) StartShim(ctx context.Context, opts cdshim.StartOpts) (_ strin | |||||||
| 	return address, nil | 	return address, nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (s *service) forward(ctx context.Context, publisher events.Publisher) { |  | ||||||
| 	for e := range s.events { |  | ||||||
| 		ctx, cancel := context.WithTimeout(ctx, timeOut) |  | ||||||
| 		err := publisher.Publish(ctx, getTopic(e), e) |  | ||||||
| 		cancel() |  | ||||||
| 		if err != nil { |  | ||||||
| 			shimLog.WithError(err).Error("post event") |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (s *service) send(evt interface{}) { | func (s *service) send(evt interface{}) { | ||||||
| 	// for unit test, it will not initialize s.events | 	// for unit test, it will not initialize s.events | ||||||
| 	if s.events != nil { | 	if s.events != nil { | ||||||
|   | |||||||
| @@ -356,6 +356,7 @@ func (c *Container) Logger() *logrus.Entry { | |||||||
| 	return virtLog.WithFields(logrus.Fields{ | 	return virtLog.WithFields(logrus.Fields{ | ||||||
| 		"subsystem": "container", | 		"subsystem": "container", | ||||||
| 		"sandbox":   c.sandboxID, | 		"sandbox":   c.sandboxID, | ||||||
|  | 		"container": c.id, | ||||||
| 	}) | 	}) | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user