e2e dra: collect and check GRPC calls

If kubelet plugin registration fails, it would be good to know more about the
communication with kubelet. Capturing the GRPC calls and then checking that
makes the failure messages more informative. Here's an example where a failure
was triggered by temporarily modifying the check so that it didn't find the
call:

  [FAILED] Timed out after 30.000s.
  Expected:
      <[]app.GRPCCall | len:2, cap:2>: [
          {
              FullMethod: "/pluginregistration.Registration/GetInfo",
              Request:
                  {},
              Response:
                  endpoint: /var/lib/kubelet/plugins/test-driver/dra.sock
                  name: test-driver.cdi.k8s.io
                  supported_versions:
                  - 1.0.0
                  type: DRAPlugin,
              Err: nil,
          },
          {
              FullMethod: "/pluginregistration.Registration/NotifyRegistrationStatus",
              Request:
                  plugin_registered: true,
              Response:
                  {},
              Err: nil,
          },
      ]
  to contain successful NotifyRegistrationStatus call
This commit is contained in:
Patrick Ohly 2023-05-31 14:28:43 +02:00
parent 1c9f08a1c5
commit d0a64739e2
6 changed files with 103 additions and 22 deletions

View File

@ -135,10 +135,11 @@ func KubeletPluginSocketPath(path string) Option {
}
}
// GRPCInterceptor is called for each incoming gRPC method call.
// GRPCInterceptor is called for each incoming gRPC method call. This option
// may be used more than once and each interceptor will get called.
func GRPCInterceptor(interceptor grpc.UnaryServerInterceptor) Option {
return func(o *options) error {
o.interceptor = interceptor
o.interceptors = append(o.interceptors, interceptor)
return nil
}
}
@ -150,7 +151,7 @@ type options struct {
draEndpoint endpoint
draAddress string
pluginRegistrationEndpoint endpoint
interceptor grpc.UnaryServerInterceptor
interceptors []grpc.UnaryServerInterceptor
}
// draPlugin combines the kubelet registration service and the DRA node plugin
@ -190,7 +191,7 @@ func Start(nodeServer drapbv1.NodeServer, opts ...Option) (result DRAPlugin, fin
}
// Run the node plugin gRPC server first to ensure that it is ready.
plugin, err := startGRPCServer(klog.LoggerWithName(o.logger, "dra"), o.grpcVerbosity, o.interceptor, o.draEndpoint, func(grpcServer *grpc.Server) {
plugin, err := startGRPCServer(klog.LoggerWithName(o.logger, "dra"), o.grpcVerbosity, o.interceptors, o.draEndpoint, func(grpcServer *grpc.Server) {
drapbv1.RegisterNodeServer(grpcServer, nodeServer)
})
if err != nil {
@ -209,7 +210,7 @@ func Start(nodeServer drapbv1.NodeServer, opts ...Option) (result DRAPlugin, fin
}()
// Now make it available to kubelet.
registrar, err := startRegistrar(klog.LoggerWithName(o.logger, "registrar"), o.grpcVerbosity, o.interceptor, o.driverName, o.draAddress, o.pluginRegistrationEndpoint)
registrar, err := startRegistrar(klog.LoggerWithName(o.logger, "registrar"), o.grpcVerbosity, o.interceptors, o.driverName, o.draAddress, o.pluginRegistrationEndpoint)
if err != nil {
return nil, fmt.Errorf("start registrar: %v", err)
}

View File

@ -31,7 +31,7 @@ type nodeRegistrar struct {
}
// startRegistrar returns a running instance.
func startRegistrar(logger klog.Logger, grpcVerbosity int, interceptor grpc.UnaryServerInterceptor, driverName string, endpoint string, pluginRegistrationEndpoint endpoint) (*nodeRegistrar, error) {
func startRegistrar(logger klog.Logger, grpcVerbosity int, interceptors []grpc.UnaryServerInterceptor, driverName string, endpoint string, pluginRegistrationEndpoint endpoint) (*nodeRegistrar, error) {
n := &nodeRegistrar{
logger: logger,
registrationServer: registrationServer{
@ -40,7 +40,7 @@ func startRegistrar(logger klog.Logger, grpcVerbosity int, interceptor grpc.Unar
supportedVersions: []string{"1.0.0"}, // TODO: is this correct?
},
}
s, err := startGRPCServer(logger, grpcVerbosity, interceptor, pluginRegistrationEndpoint, func(grpcServer *grpc.Server) {
s, err := startGRPCServer(logger, grpcVerbosity, interceptors, pluginRegistrationEndpoint, func(grpcServer *grpc.Server) {
registerapi.RegisterRegistrationServer(grpcServer, n)
})
if err != nil {

View File

@ -54,7 +54,7 @@ type endpoint struct {
// startGRPCServer sets up the GRPC server on a Unix domain socket and spawns a goroutine
// which handles requests for arbitrary services.
func startGRPCServer(logger klog.Logger, grpcVerbosity int, interceptor grpc.UnaryServerInterceptor, endpoint endpoint, services ...registerService) (*grpcServer, error) {
func startGRPCServer(logger klog.Logger, grpcVerbosity int, interceptors []grpc.UnaryServerInterceptor, endpoint endpoint, services ...registerService) (*grpcServer, error) {
s := &grpcServer{
logger: logger,
endpoint: endpoint,
@ -79,15 +79,13 @@ func startGRPCServer(logger klog.Logger, grpcVerbosity int, interceptor grpc.Una
// Run a gRPC server. It will close the listening socket when
// shutting down, so we don't need to do that.
var opts []grpc.ServerOption
var interceptors []grpc.UnaryServerInterceptor
var finalInterceptors []grpc.UnaryServerInterceptor
if grpcVerbosity >= 0 {
interceptors = append(interceptors, s.interceptor)
finalInterceptors = append(finalInterceptors, s.interceptor)
}
if interceptor != nil {
interceptors = append(interceptors, interceptor)
}
if len(interceptors) >= 0 {
opts = append(opts, grpc.ChainUnaryInterceptor(interceptors...))
finalInterceptors = append(finalInterceptors, interceptors...)
if len(finalInterceptors) >= 0 {
opts = append(opts, grpc.ChainUnaryInterceptor(finalInterceptors...))
}
s.server = grpc.NewServer(opts...)
for _, service := range services {

View File

@ -240,14 +240,14 @@ func (d *Driver) SetUp(nodes *Nodes, resources app.Resources) {
// Wait for registration.
ginkgo.By("wait for plugin registration")
gomega.Eventually(func() []string {
var notRegistered []string
gomega.Eventually(func() map[string][]app.GRPCCall {
notRegistered := make(map[string][]app.GRPCCall)
for nodename, plugin := range d.Nodes {
if !plugin.IsRegistered() {
notRegistered = append(notRegistered, nodename)
calls := plugin.GetGRPCCalls()
if contains, err := app.BeRegistered.Match(calls); err != nil || !contains {
notRegistered[nodename] = calls
}
}
sort.Strings(notRegistered)
return notRegistered
}).WithTimeout(time.Minute).Should(gomega.BeEmpty(), "hosts where the plugin has not been registered yet")
}

View File

@ -0,0 +1,32 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package app
import (
"github.com/onsi/gomega/gcustom"
)
// BeRegistered checks that plugin registration has completed.
var BeRegistered = gcustom.MakeMatcher(func(actualCalls []GRPCCall) (bool, error) {
for _, call := range actualCalls {
if call.FullMethod == "/pluginregistration.Registration/NotifyRegistrationStatus" &&
call.Err == nil {
return true, nil
}
}
return false, nil
}).WithMessage("contain successful NotifyRegistrationStatus call")

View File

@ -24,6 +24,8 @@ import (
"path/filepath"
"sync"
"google.golang.org/grpc"
"k8s.io/dynamic-resource-allocation/kubeletplugin"
"k8s.io/klog/v2"
drapbv1 "k8s.io/kubelet/pkg/apis/dra/v1alpha2"
@ -38,8 +40,23 @@ type ExamplePlugin struct {
driverName string
nodeName string
mutex sync.Mutex
prepared map[ClaimID]bool
mutex sync.Mutex
prepared map[ClaimID]bool
gRPCCalls []GRPCCall
}
type GRPCCall struct {
// FullMethod is the fully qualified, e.g. /package.service/method.
FullMethod string
// Request contains the parameters of the call.
Request interface{}
// Response contains the reply of the plugin. It is nil for calls that are in progress.
Response interface{}
// Err contains the error return value of the plugin. It is nil for calls that are in progress or succeeded.
Err error
}
// ClaimID contains both claim name and UID to simplify debugging. The
@ -94,6 +111,7 @@ func StartPlugin(logger klog.Logger, cdiDir, driverName string, nodeName string,
opts = append(opts,
kubeletplugin.Logger(logger),
kubeletplugin.DriverName(driverName),
kubeletplugin.GRPCInterceptor(ex.recordGRPCCall),
)
d, err := kubeletplugin.Start(ex, opts...)
if err != nil {
@ -206,3 +224,35 @@ func (ex *ExamplePlugin) GetPreparedResources() []ClaimID {
}
return prepared
}
func (ex *ExamplePlugin) recordGRPCCall(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
call := GRPCCall{
FullMethod: info.FullMethod,
Request: req,
}
ex.mutex.Lock()
ex.gRPCCalls = append(ex.gRPCCalls, call)
index := len(ex.gRPCCalls) - 1
ex.mutex.Unlock()
// We don't hold the mutex here to allow concurrent calls.
call.Response, call.Err = handler(ctx, req)
ex.mutex.Lock()
ex.gRPCCalls[index] = call
ex.mutex.Unlock()
return call.Response, call.Err
}
func (ex *ExamplePlugin) GetGRPCCalls() []GRPCCall {
ex.mutex.Lock()
defer ex.mutex.Unlock()
// We must return a new slice, otherwise adding new calls would become
// visible to the caller. We also need to copy the entries because
// they get mutated by recordGRPCCall.
calls := make([]GRPCCall, 0, len(ex.gRPCCalls))
calls = append(calls, ex.gRPCCalls...)
return calls
}