mirror of
https://github.com/k3s-io/kubernetes.git
synced 2026-02-22 07:03:28 +00:00
DRA kubelet: avoid deadlock when gRPC connection to driver goes idle
When gRPC notifies the kubelet that a connection ended, the kubelet tries to reconnect because it needs to know when a DRA driver comes back. The same code gets called when a connection goes idle, by default after 30 minutes. In that and only that case the conn.Connect call deadlocks while calling into the gRPC idle manager. This can be reproduced with a new unit test which artificially shortens the idle timeout. This fix is to move the Connect call into a goroutine because then both HandleConn and Connect can proceed. It's sufficient that Connect finishes at some point, it doesn't need to be immediately.
This commit is contained in:
@@ -62,6 +62,9 @@ type DRAPluginManager struct {
|
||||
wipingDelay time.Duration
|
||||
streamHandler StreamHandler
|
||||
|
||||
// withIdleTimeout is only for unit testing, ignore if <= 0.
|
||||
withIdleTimeout time.Duration
|
||||
|
||||
wg sync.WaitGroup
|
||||
mutex sync.RWMutex
|
||||
|
||||
@@ -115,7 +118,13 @@ func (m *monitoredPlugin) HandleConn(_ context.Context, stats grpcstats.ConnStat
|
||||
case *grpcstats.ConnEnd:
|
||||
// We have to ask for a reconnect, otherwise gRPC wouldn't try and
|
||||
// thus we wouldn't be notified about a restart of the plugin.
|
||||
m.conn.Connect()
|
||||
//
|
||||
// This must be done in a goroutine because gRPC deadlocks
|
||||
// when called directly from inside HandleConn when a connection
|
||||
// goes idle (and only then). It looks like cc.idlenessMgr.ExitIdleMode
|
||||
// in Connect tries to lock a mutex that is already locked by
|
||||
// the caller of HandleConn.
|
||||
go m.conn.Connect()
|
||||
default:
|
||||
return
|
||||
}
|
||||
@@ -361,12 +370,15 @@ func (pm *DRAPluginManager) add(driverName string, endpoint string, chosenServic
|
||||
// The gRPC connection gets created once. gRPC then connects to the gRPC server on demand.
|
||||
target := "unix:" + endpoint
|
||||
logger.V(4).Info("Creating new gRPC connection", "target", target)
|
||||
conn, err := grpc.NewClient(
|
||||
target,
|
||||
options := []grpc.DialOption{
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
grpc.WithChainUnaryInterceptor(newMetricsInterceptor(driverName)),
|
||||
grpc.WithStatsHandler(mp),
|
||||
)
|
||||
}
|
||||
if pm.withIdleTimeout > 0 {
|
||||
options = append(options, grpc.WithIdleTimeout(pm.withIdleTimeout))
|
||||
}
|
||||
conn, err := grpc.NewClient(target, options...)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create gRPC connection to DRA driver %s plugin at endpoint %s: %w", driverName, endpoint, err)
|
||||
}
|
||||
|
||||
@@ -186,6 +186,44 @@ func TestGRPCConnIsReused(t *testing.T) {
|
||||
require.Equal(t, 2, reusedConns[conn], "expected counter to be 2 but got %d", reusedConns[conn])
|
||||
}
|
||||
|
||||
func TestGRPCConnUsableAfterIdle(t *testing.T) {
|
||||
tCtx := ktesting.Init(t)
|
||||
service := drapbv1.DRAPluginService
|
||||
addr := path.Join(t.TempDir(), "dra.sock")
|
||||
teardown, err := setupFakeGRPCServer(service, addr)
|
||||
require.NoError(t, err)
|
||||
defer teardown()
|
||||
|
||||
driverName := "dummy-driver"
|
||||
|
||||
// ensure the plugin we are using is registered
|
||||
draPlugins := NewDRAPluginManager(tCtx, nil, nil, &mockStreamHandler{}, 0)
|
||||
draPlugins.withIdleTimeout = 5 * time.Second
|
||||
tCtx.ExpectNoError(draPlugins.add(driverName, addr, service, defaultClientCallTimeout), "add plugin")
|
||||
plugin, err := draPlugins.GetPlugin(driverName)
|
||||
tCtx.ExpectNoError(err, "get plugin")
|
||||
|
||||
// The connection doesn't really become idle because HandleConn
|
||||
// kicks it back to ready by calling Connect. Just sleep long
|
||||
// enough here, the code should be reached...
|
||||
tCtx.Log("Waiting for idle timeout...")
|
||||
time.Sleep(2 * draPlugins.withIdleTimeout)
|
||||
|
||||
req := &drapbv1.NodePrepareResourcesRequest{
|
||||
Claims: []*drapbv1.Claim{
|
||||
{
|
||||
Namespace: "dummy-namespace",
|
||||
Uid: "dummy-uid",
|
||||
Name: "dummy-claim",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
callCtx := ktesting.WithTimeout(tCtx, 10*time.Second, "call timed out")
|
||||
_, err = plugin.NodePrepareResources(callCtx, req)
|
||||
tCtx.ExpectNoError(err, "NodePrepareResources")
|
||||
}
|
||||
|
||||
func TestGetDRAPlugin(t *testing.T) {
|
||||
for _, test := range []struct {
|
||||
description string
|
||||
|
||||
Reference in New Issue
Block a user