mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-09 20:17:41 +00:00
fix slow dra unit test
This commit is contained in:
parent
656cb1028e
commit
10b6319e64
@ -19,6 +19,7 @@ package v1beta1
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
core "k8s.io/api/core/v1"
|
core "k8s.io/api/core/v1"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
@ -37,7 +38,7 @@ func (s *server) GetPluginHandler() cache.PluginHandler {
|
|||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *server) RegisterPlugin(pluginName string, endpoint string, versions []string) error {
|
func (s *server) RegisterPlugin(pluginName string, endpoint string, versions []string, pluginClientTimeout *time.Duration) error {
|
||||||
klog.V(2).InfoS("Registering plugin at endpoint", "plugin", pluginName, "endpoint", endpoint)
|
klog.V(2).InfoS("Registering plugin at endpoint", "plugin", pluginName, "endpoint", endpoint)
|
||||||
return s.connectClient(pluginName, endpoint)
|
return s.connectClient(pluginName, endpoint)
|
||||||
}
|
}
|
||||||
|
@ -84,7 +84,7 @@ type fakeDRAServerInfo struct {
|
|||||||
teardownFn tearDown
|
teardownFn tearDown
|
||||||
}
|
}
|
||||||
|
|
||||||
func setupFakeDRADriverGRPCServer(shouldTimeout bool) (fakeDRAServerInfo, error) {
|
func setupFakeDRADriverGRPCServer(shouldTimeout bool, pluginClientTimeout *time.Duration) (fakeDRAServerInfo, error) {
|
||||||
socketDir, err := os.MkdirTemp("", "dra")
|
socketDir, err := os.MkdirTemp("", "dra")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fakeDRAServerInfo{
|
return fakeDRAServerInfo{
|
||||||
@ -117,7 +117,7 @@ func setupFakeDRADriverGRPCServer(shouldTimeout bool) (fakeDRAServerInfo, error)
|
|||||||
driverName: driverName,
|
driverName: driverName,
|
||||||
}
|
}
|
||||||
if shouldTimeout {
|
if shouldTimeout {
|
||||||
timeout := plugin.PluginClientTimeout + time.Second
|
timeout := *pluginClientTimeout * 2
|
||||||
fakeDRADriverGRPCServer.timeout = &timeout
|
fakeDRADriverGRPCServer.timeout = &timeout
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -758,14 +758,20 @@ func TestPrepareResources(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
draServerInfo, err := setupFakeDRADriverGRPCServer(test.wantTimeout)
|
var pluginClientTimeout *time.Duration
|
||||||
|
if test.wantTimeout {
|
||||||
|
timeout := time.Millisecond * 20
|
||||||
|
pluginClientTimeout = &timeout
|
||||||
|
}
|
||||||
|
|
||||||
|
draServerInfo, err := setupFakeDRADriverGRPCServer(test.wantTimeout, pluginClientTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
defer draServerInfo.teardownFn()
|
defer draServerInfo.teardownFn()
|
||||||
|
|
||||||
plg := plugin.NewRegistrationHandler(nil, getFakeNode)
|
plg := plugin.NewRegistrationHandler(nil, getFakeNode)
|
||||||
if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{"1.27"}); err != nil {
|
if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{"1.27"}, pluginClientTimeout); err != nil {
|
||||||
t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err)
|
t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err)
|
||||||
}
|
}
|
||||||
defer plg.DeRegisterPlugin(test.driverName) // for sake of next tests
|
defer plg.DeRegisterPlugin(test.driverName) // for sake of next tests
|
||||||
@ -1058,14 +1064,20 @@ func TestUnprepareResources(t *testing.T) {
|
|||||||
t.Fatalf("failed to create a new instance of the claimInfoCache, err: %v", err)
|
t.Fatalf("failed to create a new instance of the claimInfoCache, err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
draServerInfo, err := setupFakeDRADriverGRPCServer(test.wantTimeout)
|
var pluginClientTimeout *time.Duration
|
||||||
|
if test.wantTimeout {
|
||||||
|
timeout := time.Millisecond * 20
|
||||||
|
pluginClientTimeout = &timeout
|
||||||
|
}
|
||||||
|
|
||||||
|
draServerInfo, err := setupFakeDRADriverGRPCServer(test.wantTimeout, pluginClientTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
defer draServerInfo.teardownFn()
|
defer draServerInfo.teardownFn()
|
||||||
|
|
||||||
plg := plugin.NewRegistrationHandler(nil, getFakeNode)
|
plg := plugin.NewRegistrationHandler(nil, getFakeNode)
|
||||||
if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{"1.27"}); err != nil {
|
if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{"1.27"}, pluginClientTimeout); err != nil {
|
||||||
t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err)
|
t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err)
|
||||||
}
|
}
|
||||||
defer plg.DeRegisterPlugin(test.driverName) // for sake of next tests
|
defer plg.DeRegisterPlugin(test.driverName) // for sake of next tests
|
||||||
|
@ -154,7 +154,7 @@ func (p *plugin) NodePrepareResources(
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(ctx, PluginClientTimeout)
|
ctx, cancel := context.WithTimeout(ctx, p.clientTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
version := p.getVersion()
|
version := p.getVersion()
|
||||||
@ -183,7 +183,7 @@ func (p *plugin) NodeUnprepareResources(
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(ctx, PluginClientTimeout)
|
ctx, cancel := context.WithTimeout(ctx, p.clientTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
version := p.getVersion()
|
version := p.getVersion()
|
||||||
|
@ -268,8 +268,9 @@ func TestNodeUnprepareResource(t *testing.T) {
|
|||||||
defer teardown()
|
defer teardown()
|
||||||
|
|
||||||
p := &plugin{
|
p := &plugin{
|
||||||
endpoint: addr,
|
endpoint: addr,
|
||||||
version: v1alpha3Version,
|
version: v1alpha3Version,
|
||||||
|
clientTimeout: PluginClientTimeout,
|
||||||
}
|
}
|
||||||
|
|
||||||
conn, err := p.getOrCreateGRPCConn()
|
conn, err := p.getOrCreateGRPCConn()
|
||||||
|
@ -49,6 +49,7 @@ type plugin struct {
|
|||||||
endpoint string
|
endpoint string
|
||||||
version string
|
version string
|
||||||
highestSupportedVersion *utilversion.Version
|
highestSupportedVersion *utilversion.Version
|
||||||
|
clientTimeout time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *plugin) getOrCreateGRPCConn() (*grpc.ClientConn, error) {
|
func (p *plugin) getOrCreateGRPCConn() (*grpc.ClientConn, error) {
|
||||||
@ -116,7 +117,7 @@ func NewRegistrationHandler(kubeClient kubernetes.Interface, getNode func() (*v1
|
|||||||
}
|
}
|
||||||
|
|
||||||
// RegisterPlugin is called when a plugin can be registered.
|
// RegisterPlugin is called when a plugin can be registered.
|
||||||
func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, versions []string) error {
|
func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, versions []string, pluginClientTimeout *time.Duration) error {
|
||||||
klog.InfoS("Register new DRA plugin", "name", pluginName, "endpoint", endpoint)
|
klog.InfoS("Register new DRA plugin", "name", pluginName, "endpoint", endpoint)
|
||||||
|
|
||||||
highestSupportedVersion, err := h.validateVersions("RegisterPlugin", pluginName, versions)
|
highestSupportedVersion, err := h.validateVersions("RegisterPlugin", pluginName, versions)
|
||||||
@ -124,11 +125,19 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string,
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var timeout time.Duration
|
||||||
|
if pluginClientTimeout == nil {
|
||||||
|
timeout = PluginClientTimeout
|
||||||
|
} else {
|
||||||
|
timeout = *pluginClientTimeout
|
||||||
|
}
|
||||||
|
|
||||||
pluginInstance := &plugin{
|
pluginInstance := &plugin{
|
||||||
conn: nil,
|
conn: nil,
|
||||||
endpoint: endpoint,
|
endpoint: endpoint,
|
||||||
version: v1alpha3Version,
|
version: v1alpha3Version,
|
||||||
highestSupportedVersion: highestSupportedVersion,
|
highestSupportedVersion: highestSupportedVersion,
|
||||||
|
clientTimeout: timeout,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Storing endpoint of newly registered DRA Plugin into the map, where plugin name will be the key
|
// Storing endpoint of newly registered DRA Plugin into the map, where plugin name will be the key
|
||||||
|
@ -56,7 +56,7 @@ func TestRegistrationHandler_ValidatePlugin(t *testing.T) {
|
|||||||
description: "plugin already registered with a higher supported version",
|
description: "plugin already registered with a higher supported version",
|
||||||
handler: func() *RegistrationHandler {
|
handler: func() *RegistrationHandler {
|
||||||
handler := newRegistrationHandler()
|
handler := newRegistrationHandler()
|
||||||
if err := handler.RegisterPlugin("this-plugin-already-exists-and-has-a-long-name-so-it-doesnt-collide", "", []string{"v1.1.0"}); err != nil {
|
if err := handler.RegisterPlugin("this-plugin-already-exists-and-has-a-long-name-so-it-doesnt-collide", "", []string{"v1.1.0"}, nil); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
return handler
|
return handler
|
||||||
|
4
pkg/kubelet/pluginmanager/cache/types.go
vendored
4
pkg/kubelet/pluginmanager/cache/types.go
vendored
@ -16,6 +16,8 @@ limitations under the License.
|
|||||||
|
|
||||||
package cache
|
package cache
|
||||||
|
|
||||||
|
import "time"
|
||||||
|
|
||||||
// PluginHandler is an interface a client of the pluginwatcher API needs to implement in
|
// PluginHandler is an interface a client of the pluginwatcher API needs to implement in
|
||||||
// order to consume plugins
|
// order to consume plugins
|
||||||
// The PluginHandler follows the simple following state machine:
|
// The PluginHandler follows the simple following state machine:
|
||||||
@ -51,7 +53,7 @@ type PluginHandler interface {
|
|||||||
// RegisterPlugin is called so that the plugin can be registered by any
|
// RegisterPlugin is called so that the plugin can be registered by any
|
||||||
// plugin consumer
|
// plugin consumer
|
||||||
// Error encountered here can still be Notified to the plugin.
|
// Error encountered here can still be Notified to the plugin.
|
||||||
RegisterPlugin(pluginName, endpoint string, versions []string) error
|
RegisterPlugin(pluginName, endpoint string, versions []string, pluginClientTimeout *time.Duration) error
|
||||||
// DeRegisterPlugin is called once the pluginwatcher observes that the socket has
|
// DeRegisterPlugin is called once the pluginwatcher observes that the socket has
|
||||||
// been deleted.
|
// been deleted.
|
||||||
DeRegisterPlugin(pluginName string)
|
DeRegisterPlugin(pluginName string)
|
||||||
|
@ -121,7 +121,7 @@ func (og *operationGenerator) GenerateRegisterPluginFunc(
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
klog.ErrorS(err, "RegisterPlugin error -- failed to add plugin", "path", socketPath)
|
klog.ErrorS(err, "RegisterPlugin error -- failed to add plugin", "path", socketPath)
|
||||||
}
|
}
|
||||||
if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil {
|
if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions, nil); err != nil {
|
||||||
return og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin registration failed with err: %v", err))
|
return og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin registration failed with err: %v", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -59,7 +59,7 @@ func (f *fakePluginHandler) ValidatePlugin(pluginName string, endpoint string, v
|
|||||||
}
|
}
|
||||||
|
|
||||||
// RegisterPlugin is a fake method
|
// RegisterPlugin is a fake method
|
||||||
func (f *fakePluginHandler) RegisterPlugin(pluginName, endpoint string, versions []string) error {
|
func (f *fakePluginHandler) RegisterPlugin(pluginName, endpoint string, versions []string, pluginClientTimeout *time.Duration) error {
|
||||||
f.Lock()
|
f.Lock()
|
||||||
defer f.Unlock()
|
defer f.Unlock()
|
||||||
f.events = append(f.events, "register "+pluginName)
|
f.events = append(f.events, "register "+pluginName)
|
||||||
|
@ -127,7 +127,7 @@ func (d *DummyImpl) ValidatePlugin(pluginName string, endpoint string, versions
|
|||||||
}
|
}
|
||||||
|
|
||||||
// RegisterPlugin is a dummy implementation
|
// RegisterPlugin is a dummy implementation
|
||||||
func (d *DummyImpl) RegisterPlugin(pluginName string, endpoint string, versions []string) error {
|
func (d *DummyImpl) RegisterPlugin(pluginName string, endpoint string, versions []string, pluginClientTimeout *time.Duration) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -109,7 +109,7 @@ func (h *RegistrationHandler) ValidatePlugin(pluginName string, endpoint string,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// RegisterPlugin is called when a plugin can be registered
|
// RegisterPlugin is called when a plugin can be registered
|
||||||
func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, versions []string) error {
|
func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, versions []string, pluginClientTimeout *time.Duration) error {
|
||||||
klog.Infof(log("Register new plugin with name: %s at endpoint: %s", pluginName, endpoint))
|
klog.Infof(log("Register new plugin with name: %s at endpoint: %s", pluginName, endpoint))
|
||||||
|
|
||||||
highestSupportedVersion, err := h.validateVersions("RegisterPlugin", pluginName, endpoint, versions)
|
highestSupportedVersion, err := h.validateVersions("RegisterPlugin", pluginName, endpoint, versions)
|
||||||
@ -130,7 +130,14 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string,
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
|
var timeout time.Duration
|
||||||
|
if pluginClientTimeout == nil {
|
||||||
|
timeout = csiTimeout
|
||||||
|
} else {
|
||||||
|
timeout = *pluginClientTimeout
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
driverNodeID, maxVolumePerNode, accessibleTopology, err := csi.NodeGetInfo(ctx)
|
driverNodeID, maxVolumePerNode, accessibleTopology, err := csi.NodeGetInfo(ctx)
|
||||||
|
Loading…
Reference in New Issue
Block a user