mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-13 22:05:59 +00:00
Device Plugin now closes client connexion
This commit is contained in:
parent
773268d533
commit
d2f08c94a9
@ -33,7 +33,8 @@ import (
|
|||||||
// for managing gRPC communications with the device plugin and caching
|
// for managing gRPC communications with the device plugin and caching
|
||||||
// device states reported by the device plugin.
|
// device states reported by the device plugin.
|
||||||
type endpoint struct {
|
type endpoint struct {
|
||||||
client pluginapi.DevicePluginClient
|
client pluginapi.DevicePluginClient
|
||||||
|
clientConn *grpc.ClientConn
|
||||||
|
|
||||||
socketPath string
|
socketPath string
|
||||||
resourceName string
|
resourceName string
|
||||||
@ -42,32 +43,25 @@ type endpoint struct {
|
|||||||
mutex sync.Mutex
|
mutex sync.Mutex
|
||||||
|
|
||||||
callback MonitorCallback
|
callback MonitorCallback
|
||||||
|
|
||||||
cancel context.CancelFunc
|
|
||||||
ctx context.Context
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// newEndpoint creates a new endpoint for the given resourceName.
|
// newEndpoint creates a new endpoint for the given resourceName.
|
||||||
func newEndpoint(socketPath, resourceName string, callback MonitorCallback) (*endpoint, error) {
|
func newEndpoint(socketPath, resourceName string, callback MonitorCallback) (*endpoint, error) {
|
||||||
client, err := dial(socketPath)
|
client, c, err := dial(socketPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Can't create new endpoint with path %s err %v", socketPath, err)
|
glog.Errorf("Can't create new endpoint with path %s err %v", socketPath, err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, stop := context.WithCancel(context.Background())
|
|
||||||
|
|
||||||
return &endpoint{
|
return &endpoint{
|
||||||
client: client,
|
client: client,
|
||||||
|
clientConn: c,
|
||||||
|
|
||||||
socketPath: socketPath,
|
socketPath: socketPath,
|
||||||
resourceName: resourceName,
|
resourceName: resourceName,
|
||||||
|
|
||||||
devices: nil,
|
devices: nil,
|
||||||
callback: callback,
|
callback: callback,
|
||||||
|
|
||||||
cancel: stop,
|
|
||||||
ctx: ctx,
|
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -80,11 +74,9 @@ func (e *endpoint) getDevices() []*pluginapi.Device {
|
|||||||
// list initializes ListAndWatch gRPC call for the device plugin and gets the
|
// list initializes ListAndWatch gRPC call for the device plugin and gets the
|
||||||
// initial list of the devices. Returns ListAndWatch gRPC stream on success.
|
// initial list of the devices. Returns ListAndWatch gRPC stream on success.
|
||||||
func (e *endpoint) list() (pluginapi.DevicePlugin_ListAndWatchClient, error) {
|
func (e *endpoint) list() (pluginapi.DevicePlugin_ListAndWatchClient, error) {
|
||||||
glog.V(3).Infof("Starting List")
|
stream, err := e.client.ListAndWatch(context.Background(), &pluginapi.Empty{})
|
||||||
stream, err := e.client.ListAndWatch(e.ctx, &pluginapi.Empty{})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf(errListAndWatch, e.resourceName, err)
|
glog.Errorf(errListAndWatch, e.resourceName, err)
|
||||||
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -184,7 +176,6 @@ func (e *endpoint) listAndWatch(stream pluginapi.DevicePlugin_ListAndWatchClient
|
|||||||
|
|
||||||
e.callback(e.resourceName, added, updated, deleted)
|
e.callback(e.resourceName, added, updated, deleted)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// allocate issues Allocate gRPC call to the device plugin.
|
// allocate issues Allocate gRPC call to the device plugin.
|
||||||
@ -195,11 +186,11 @@ func (e *endpoint) allocate(devs []string) (*pluginapi.AllocateResponse, error)
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (e *endpoint) stop() {
|
func (e *endpoint) stop() {
|
||||||
e.cancel()
|
e.clientConn.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
// dial establishes the gRPC communication with the registered device plugin.
|
// dial establishes the gRPC communication with the registered device plugin.
|
||||||
func dial(unixSocketPath string) (pluginapi.DevicePluginClient, error) {
|
func dial(unixSocketPath string) (pluginapi.DevicePluginClient, *grpc.ClientConn, error) {
|
||||||
c, err := grpc.Dial(unixSocketPath, grpc.WithInsecure(),
|
c, err := grpc.Dial(unixSocketPath, grpc.WithInsecure(),
|
||||||
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
|
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
|
||||||
return net.DialTimeout("unix", addr, timeout)
|
return net.DialTimeout("unix", addr, timeout)
|
||||||
@ -207,8 +198,8 @@ func dial(unixSocketPath string) (pluginapi.DevicePluginClient, error) {
|
|||||||
)
|
)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf(errFailedToDialDevicePlugin+" %v", err)
|
return nil, nil, fmt.Errorf(errFailedToDialDevicePlugin+" %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return pluginapi.NewDevicePluginClient(c), nil
|
return pluginapi.NewDevicePluginClient(c), c, nil
|
||||||
}
|
}
|
||||||
|
@ -206,9 +206,11 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
|
|||||||
glog.Errorf("Failed to dial device plugin with request %v: %v", r, err)
|
glog.Errorf("Failed to dial device plugin with request %v: %v", r, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
stream, err := e.list()
|
stream, err := e.list()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Failed to List devices for plugin %v: %v", r.ResourceName, err)
|
glog.Errorf("Failed to List devices for plugin %v: %v", r.ResourceName, err)
|
||||||
|
e.stop()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -219,12 +221,14 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
|
|||||||
m.endpoints[r.ResourceName] = e
|
m.endpoints[r.ResourceName] = e
|
||||||
m.mutex.Unlock()
|
m.mutex.Unlock()
|
||||||
glog.V(2).Infof("Registered endpoint %v", e)
|
glog.V(2).Infof("Registered endpoint %v", e)
|
||||||
|
|
||||||
if ok && old != nil {
|
if ok && old != nil {
|
||||||
old.stop()
|
old.stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
e.listAndWatch(stream)
|
e.listAndWatch(stream)
|
||||||
|
e.stop()
|
||||||
|
|
||||||
m.mutex.Lock()
|
m.mutex.Lock()
|
||||||
if old, ok := m.endpoints[r.ResourceName]; ok && old == e {
|
if old, ok := m.endpoints[r.ResourceName]; ok && old == e {
|
||||||
@ -233,6 +237,7 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
|
|||||||
// Issues callback to delete all of devices.
|
// Issues callback to delete all of devices.
|
||||||
e.callback(e.resourceName, []*pluginapi.Device{}, []*pluginapi.Device{}, e.getDevices())
|
e.callback(e.resourceName, []*pluginapi.Device{}, []*pluginapi.Device{}, e.getDevices())
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(2).Infof("Unregistered endpoint %v", e)
|
glog.V(2).Infof("Unregistered endpoint %v", e)
|
||||||
m.mutex.Unlock()
|
m.mutex.Unlock()
|
||||||
}()
|
}()
|
||||||
|
Loading…
Reference in New Issue
Block a user