Simplify unregistration of csiplugin

Signed-off-by: Ted Yu <yuzhihong@gmail.com>
This commit is contained in:
Ted Yu 2020-04-13 15:04:52 -07:00
parent a7f43a7203
commit 1001be85ad
6 changed files with 34 additions and 78 deletions

View File

@ -75,22 +75,12 @@ type actualStateOfWorld struct {
var _ ActualStateOfWorld = &actualStateOfWorld{} var _ ActualStateOfWorld = &actualStateOfWorld{}
// NamedPluginHandler holds information for handler and the name of the plugin
type NamedPluginHandler struct {
Handler PluginHandler
Name string
}
// SocketPluginHandlers contains the map from socket path to NamedPluginHandler
type SocketPluginHandlers struct {
Handlers map[string]NamedPluginHandler
sync.Mutex
}
// PluginInfo holds information of a plugin // PluginInfo holds information of a plugin
type PluginInfo struct { type PluginInfo struct {
SocketPath string SocketPath string
Timestamp time.Time Timestamp time.Time
Handler PluginHandler
Name string
} }
func (asw *actualStateOfWorld) AddPlugin(pluginInfo PluginInfo) error { func (asw *actualStateOfWorld) AddPlugin(pluginInfo PluginInfo) error {

View File

@ -30,6 +30,8 @@ func Test_ASW_AddPlugin_Positive_NewPlugin(t *testing.T) {
pluginInfo := PluginInfo{ pluginInfo := PluginInfo{
SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock", SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock",
Timestamp: time.Now(), Timestamp: time.Now(),
Handler: nil,
Name: "test",
} }
asw := NewActualStateOfWorld() asw := NewActualStateOfWorld()
err := asw.AddPlugin(pluginInfo) err := asw.AddPlugin(pluginInfo)
@ -61,6 +63,8 @@ func Test_ASW_AddPlugin_Negative_EmptySocketPath(t *testing.T) {
pluginInfo := PluginInfo{ pluginInfo := PluginInfo{
SocketPath: "", SocketPath: "",
Timestamp: time.Now(), Timestamp: time.Now(),
Handler: nil,
Name: "test",
} }
err := asw.AddPlugin(pluginInfo) err := asw.AddPlugin(pluginInfo)
require.EqualError(t, err, "socket path is empty") require.EqualError(t, err, "socket path is empty")
@ -86,6 +90,8 @@ func Test_ASW_RemovePlugin_Positive(t *testing.T) {
pluginInfo := PluginInfo{ pluginInfo := PluginInfo{
SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock", SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock",
Timestamp: time.Now(), Timestamp: time.Now(),
Handler: nil,
Name: "test",
} }
err := asw.AddPlugin(pluginInfo) err := asw.AddPlugin(pluginInfo)
// Assert // Assert
@ -116,6 +122,8 @@ func Test_ASW_PluginExistsWithCorrectTimestamp_Negative_WrongTimestamp(t *testin
pluginInfo := PluginInfo{ pluginInfo := PluginInfo{
SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock", SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock",
Timestamp: time.Now(), Timestamp: time.Now(),
Handler: nil,
Name: "test",
} }
err := asw.AddPlugin(pluginInfo) err := asw.AddPlugin(pluginInfo)
// Assert // Assert

View File

@ -45,11 +45,11 @@ import (
type OperationExecutor interface { type OperationExecutor interface {
// RegisterPlugin registers the given plugin using the a handler in the plugin handler map. // RegisterPlugin registers the given plugin using the a handler in the plugin handler map.
// It then updates the actual state of the world to reflect that. // It then updates the actual state of the world to reflect that.
RegisterPlugin(socketPath string, timestamp time.Time, pluginHandlers map[string]cache.PluginHandler, pathToHandlers *cache.SocketPluginHandlers, actualStateOfWorld ActualStateOfWorldUpdater) error RegisterPlugin(socketPath string, timestamp time.Time, pluginHandlers map[string]cache.PluginHandler, actualStateOfWorld ActualStateOfWorldUpdater) error
// UnregisterPlugin deregisters the given plugin using a handler in the given plugin handler map. // UnregisterPlugin deregisters the given plugin using a handler in the given plugin handler map.
// It then updates the actual state of the world to reflect that. // It then updates the actual state of the world to reflect that.
UnregisterPlugin(socketPath string, pluginHandlers map[string]cache.PluginHandler, pathToHandlers *cache.SocketPluginHandlers, actualStateOfWorld ActualStateOfWorldUpdater) error UnregisterPlugin(pluginInfo cache.PluginInfo, actualStateOfWorld ActualStateOfWorldUpdater) error
} }
// NewOperationExecutor returns a new instance of OperationExecutor. // NewOperationExecutor returns a new instance of OperationExecutor.
@ -96,23 +96,20 @@ func (oe *operationExecutor) RegisterPlugin(
socketPath string, socketPath string,
timestamp time.Time, timestamp time.Time,
pluginHandlers map[string]cache.PluginHandler, pluginHandlers map[string]cache.PluginHandler,
pathToHandlers *cache.SocketPluginHandlers,
actualStateOfWorld ActualStateOfWorldUpdater) error { actualStateOfWorld ActualStateOfWorldUpdater) error {
generatedOperation := generatedOperation :=
oe.operationGenerator.GenerateRegisterPluginFunc(socketPath, timestamp, pluginHandlers, pathToHandlers, actualStateOfWorld) oe.operationGenerator.GenerateRegisterPluginFunc(socketPath, timestamp, pluginHandlers, actualStateOfWorld)
return oe.pendingOperations.Run( return oe.pendingOperations.Run(
socketPath, generatedOperation) socketPath, generatedOperation)
} }
func (oe *operationExecutor) UnregisterPlugin( func (oe *operationExecutor) UnregisterPlugin(
socketPath string, pluginInfo cache.PluginInfo,
pluginHandlers map[string]cache.PluginHandler,
pathToHandlers *cache.SocketPluginHandlers,
actualStateOfWorld ActualStateOfWorldUpdater) error { actualStateOfWorld ActualStateOfWorldUpdater) error {
generatedOperation := generatedOperation :=
oe.operationGenerator.GenerateUnregisterPluginFunc(socketPath, pluginHandlers, pathToHandlers, actualStateOfWorld) oe.operationGenerator.GenerateUnregisterPluginFunc(pluginInfo, actualStateOfWorld)
return oe.pendingOperations.Run( return oe.pendingOperations.Run(
socketPath, generatedOperation) pluginInfo.SocketPath, generatedOperation)
} }

View File

@ -44,10 +44,9 @@ func init() {
func TestOperationExecutor_RegisterPlugin_ConcurrentRegisterPlugin(t *testing.T) { func TestOperationExecutor_RegisterPlugin_ConcurrentRegisterPlugin(t *testing.T) {
ch, quit, oe := setup() ch, quit, oe := setup()
hdlr := cache.SocketPluginHandlers{}
for i := 0; i < numPluginsToRegister; i++ { for i := 0; i < numPluginsToRegister; i++ {
socketPath := fmt.Sprintf("%s/plugin-%d.sock", socketDir, i) socketPath := fmt.Sprintf("%s/plugin-%d.sock", socketDir, i)
oe.RegisterPlugin(socketPath, time.Now(), nil /* plugin handlers */, &hdlr, nil /* actual state of the world updator */) oe.RegisterPlugin(socketPath, time.Now(), nil /* plugin handlers */, nil /* actual state of the world updator */)
} }
if !isOperationRunConcurrently(ch, quit, numPluginsToRegister) { if !isOperationRunConcurrently(ch, quit, numPluginsToRegister) {
t.Fatalf("Unable to start register operations in Concurrent for plugins") t.Fatalf("Unable to start register operations in Concurrent for plugins")
@ -57,9 +56,8 @@ func TestOperationExecutor_RegisterPlugin_ConcurrentRegisterPlugin(t *testing.T)
func TestOperationExecutor_RegisterPlugin_SerialRegisterPlugin(t *testing.T) { func TestOperationExecutor_RegisterPlugin_SerialRegisterPlugin(t *testing.T) {
ch, quit, oe := setup() ch, quit, oe := setup()
socketPath := fmt.Sprintf("%s/plugin-serial.sock", socketDir) socketPath := fmt.Sprintf("%s/plugin-serial.sock", socketDir)
hdlr := cache.SocketPluginHandlers{}
for i := 0; i < numPluginsToRegister; i++ { for i := 0; i < numPluginsToRegister; i++ {
oe.RegisterPlugin(socketPath, time.Now(), nil /* plugin handlers */, &hdlr, nil /* actual state of the world updator */) oe.RegisterPlugin(socketPath, time.Now(), nil /* plugin handlers */, nil /* actual state of the world updator */)
} }
if !isOperationRunSerially(ch, quit) { if !isOperationRunSerially(ch, quit) {
@ -69,10 +67,10 @@ func TestOperationExecutor_RegisterPlugin_SerialRegisterPlugin(t *testing.T) {
func TestOperationExecutor_UnregisterPlugin_ConcurrentUnregisterPlugin(t *testing.T) { func TestOperationExecutor_UnregisterPlugin_ConcurrentUnregisterPlugin(t *testing.T) {
ch, quit, oe := setup() ch, quit, oe := setup()
hdlr := cache.SocketPluginHandlers{}
for i := 0; i < numPluginsToUnregister; i++ { for i := 0; i < numPluginsToUnregister; i++ {
socketPath := "socket-path" + strconv.Itoa(i) socketPath := "socket-path" + strconv.Itoa(i)
oe.UnregisterPlugin(socketPath, nil /* plugin handlers */, &hdlr, nil /* actual state of the world updator */) pluginInfo := cache.PluginInfo{SocketPath: socketPath}
oe.UnregisterPlugin(pluginInfo, nil /* actual state of the world updator */)
} }
if !isOperationRunConcurrently(ch, quit, numPluginsToUnregister) { if !isOperationRunConcurrently(ch, quit, numPluginsToUnregister) {
@ -83,9 +81,9 @@ func TestOperationExecutor_UnregisterPlugin_ConcurrentUnregisterPlugin(t *testin
func TestOperationExecutor_UnregisterPlugin_SerialUnregisterPlugin(t *testing.T) { func TestOperationExecutor_UnregisterPlugin_SerialUnregisterPlugin(t *testing.T) {
ch, quit, oe := setup() ch, quit, oe := setup()
socketPath := fmt.Sprintf("%s/plugin-serial.sock", socketDir) socketPath := fmt.Sprintf("%s/plugin-serial.sock", socketDir)
hdlr := cache.SocketPluginHandlers{}
for i := 0; i < numPluginsToUnregister; i++ { for i := 0; i < numPluginsToUnregister; i++ {
oe.UnregisterPlugin(socketPath, nil /* plugin handlers */, &hdlr, nil /* actual state of the world updator */) pluginInfo := cache.PluginInfo{SocketPath: socketPath}
oe.UnregisterPlugin(pluginInfo, nil /* actual state of the world updator */)
} }
if !isOperationRunSerially(ch, quit) { if !isOperationRunSerially(ch, quit) {
@ -109,7 +107,6 @@ func (fopg *fakeOperationGenerator) GenerateRegisterPluginFunc(
socketPath string, socketPath string,
timestamp time.Time, timestamp time.Time,
pluginHandlers map[string]cache.PluginHandler, pluginHandlers map[string]cache.PluginHandler,
pathToHandlers *cache.SocketPluginHandlers,
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error { actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
opFunc := func() error { opFunc := func() error {
@ -120,9 +117,7 @@ func (fopg *fakeOperationGenerator) GenerateRegisterPluginFunc(
} }
func (fopg *fakeOperationGenerator) GenerateUnregisterPluginFunc( func (fopg *fakeOperationGenerator) GenerateUnregisterPluginFunc(
socketPath string, pluginInfo cache.PluginInfo,
pluginHandlers map[string]cache.PluginHandler,
pathToHandlers *cache.SocketPluginHandlers,
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error { actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
opFunc := func() error { opFunc := func() error {
startOperationAndBlock(fopg.ch, fopg.quit) startOperationAndBlock(fopg.ch, fopg.quit)

View File

@ -63,14 +63,11 @@ type OperationGenerator interface {
socketPath string, socketPath string,
timestamp time.Time, timestamp time.Time,
pluginHandlers map[string]cache.PluginHandler, pluginHandlers map[string]cache.PluginHandler,
pathToHandlers *cache.SocketPluginHandlers,
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error
// Generates the UnregisterPlugin function needed to perform the unregistration of a plugin // Generates the UnregisterPlugin function needed to perform the unregistration of a plugin
GenerateUnregisterPluginFunc( GenerateUnregisterPluginFunc(
socketPath string, pluginInfo cache.PluginInfo,
pluginHandlers map[string]cache.PluginHandler,
pathToHandlers *cache.SocketPluginHandlers,
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error
} }
@ -78,7 +75,6 @@ func (og *operationGenerator) GenerateRegisterPluginFunc(
socketPath string, socketPath string,
timestamp time.Time, timestamp time.Time,
pluginHandlers map[string]cache.PluginHandler, pluginHandlers map[string]cache.PluginHandler,
pathToHandlers *cache.SocketPluginHandlers,
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error { actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
registerPluginFunc := func() error { registerPluginFunc := func() error {
@ -118,6 +114,8 @@ func (og *operationGenerator) GenerateRegisterPluginFunc(
err = actualStateOfWorldUpdater.AddPlugin(cache.PluginInfo{ err = actualStateOfWorldUpdater.AddPlugin(cache.PluginInfo{
SocketPath: socketPath, SocketPath: socketPath,
Timestamp: timestamp, Timestamp: timestamp,
Handler: handler,
Name: infoResp.Name,
}) })
if err != nil { if err != nil {
klog.Errorf("RegisterPlugin error -- failed to add plugin at socket %s, err: %v", socketPath, err) klog.Errorf("RegisterPlugin error -- failed to add plugin at socket %s, err: %v", socketPath, err)
@ -125,12 +123,6 @@ func (og *operationGenerator) GenerateRegisterPluginFunc(
if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil { if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil {
return og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin registration failed with err: %v", err)) return og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin registration failed with err: %v", err))
} }
pathToHandlers.Lock()
if pathToHandlers.Handlers == nil {
pathToHandlers.Handlers = make(map[string]cache.NamedPluginHandler)
}
pathToHandlers.Handlers[socketPath] = cache.NamedPluginHandler{Handler: handler, Name: infoResp.Name}
pathToHandlers.Unlock()
// Notify is called after register to guarantee that even if notify throws an error Register will always be called after validate // Notify is called after register to guarantee that even if notify throws an error Register will always be called after validate
if err := og.notifyPlugin(client, true, ""); err != nil { if err := og.notifyPlugin(client, true, ""); err != nil {
@ -142,37 +134,20 @@ func (og *operationGenerator) GenerateRegisterPluginFunc(
} }
func (og *operationGenerator) GenerateUnregisterPluginFunc( func (og *operationGenerator) GenerateUnregisterPluginFunc(
socketPath string, pluginInfo cache.PluginInfo,
pluginHandlers map[string]cache.PluginHandler,
pathToHandlers *cache.SocketPluginHandlers,
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error { actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
unregisterPluginFunc := func() error { unregisterPluginFunc := func() error {
_, conn, err := dial(socketPath, dialTimeoutDuration) if pluginInfo.Handler == nil {
if err != nil { return fmt.Errorf("UnregisterPlugin error -- failed to get plugin handler for %s", pluginInfo.SocketPath)
klog.V(4).Infof("unable to dial: %v", err)
} else {
conn.Close()
}
var handlerWithName cache.NamedPluginHandler
pathToHandlers.Lock()
handlerWithName, handlerFound := pathToHandlers.Handlers[socketPath]
pathToHandlers.Unlock()
if !handlerFound {
return fmt.Errorf("UnregisterPlugin error -- failed to get plugin handler for %s", socketPath)
} }
// We remove the plugin to the actual state of world cache before calling a plugin consumer's Unregister handle // We remove the plugin to the actual state of world cache before calling a plugin consumer's Unregister handle
// so that if we receive a register event during Register Plugin, we can process it as a Register call. // so that if we receive a register event during Register Plugin, we can process it as a Register call.
actualStateOfWorldUpdater.RemovePlugin(socketPath) actualStateOfWorldUpdater.RemovePlugin(pluginInfo.SocketPath)
handlerWithName.Handler.DeRegisterPlugin(handlerWithName.Name) pluginInfo.Handler.DeRegisterPlugin(pluginInfo.Name)
pathToHandlers.Lock() klog.V(4).Infof("DeRegisterPlugin called for %s on %v", pluginInfo.Name, pluginInfo.Handler)
delete(pathToHandlers.Handlers, socketPath)
pathToHandlers.Unlock()
klog.V(4).Infof("DeRegisterPlugin called for %s on %v", handlerWithName.Name, handlerWithName.Handler)
return nil return nil
} }
return unregisterPluginFunc return unregisterPluginFunc

View File

@ -67,7 +67,6 @@ func NewReconciler(
desiredStateOfWorld: desiredStateOfWorld, desiredStateOfWorld: desiredStateOfWorld,
actualStateOfWorld: actualStateOfWorld, actualStateOfWorld: actualStateOfWorld,
handlers: make(map[string]cache.PluginHandler), handlers: make(map[string]cache.PluginHandler),
pathToHandlers: cache.SocketPluginHandlers{Handlers: make(map[string]cache.NamedPluginHandler)},
} }
} }
@ -77,7 +76,6 @@ type reconciler struct {
desiredStateOfWorld cache.DesiredStateOfWorld desiredStateOfWorld cache.DesiredStateOfWorld
actualStateOfWorld cache.ActualStateOfWorld actualStateOfWorld cache.ActualStateOfWorld
handlers map[string]cache.PluginHandler handlers map[string]cache.PluginHandler
pathToHandlers cache.SocketPluginHandlers
sync.RWMutex sync.RWMutex
} }
@ -105,13 +103,6 @@ func (rc *reconciler) getHandlers() map[string]cache.PluginHandler {
return rc.handlers return rc.handlers
} }
func (rc *reconciler) getPathToHandlers() *cache.SocketPluginHandlers {
rc.RLock()
defer rc.RUnlock()
return &rc.pathToHandlers
}
func (rc *reconciler) reconcile() { func (rc *reconciler) reconcile() {
// Unregisterations are triggered before registrations // Unregisterations are triggered before registrations
@ -136,7 +127,7 @@ func (rc *reconciler) reconcile() {
if unregisterPlugin { if unregisterPlugin {
klog.V(5).Infof(registeredPlugin.GenerateMsgDetailed("Starting operationExecutor.UnregisterPlugin", "")) klog.V(5).Infof(registeredPlugin.GenerateMsgDetailed("Starting operationExecutor.UnregisterPlugin", ""))
err := rc.operationExecutor.UnregisterPlugin(registeredPlugin.SocketPath, rc.getHandlers(), rc.getPathToHandlers(), rc.actualStateOfWorld) err := rc.operationExecutor.UnregisterPlugin(registeredPlugin, rc.actualStateOfWorld)
if err != nil && if err != nil &&
!goroutinemap.IsAlreadyExists(err) && !goroutinemap.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) { !exponentialbackoff.IsExponentialBackoff(err) {
@ -154,7 +145,7 @@ func (rc *reconciler) reconcile() {
for _, pluginToRegister := range rc.desiredStateOfWorld.GetPluginsToRegister() { for _, pluginToRegister := range rc.desiredStateOfWorld.GetPluginsToRegister() {
if !rc.actualStateOfWorld.PluginExistsWithCorrectTimestamp(pluginToRegister) { if !rc.actualStateOfWorld.PluginExistsWithCorrectTimestamp(pluginToRegister) {
klog.V(5).Infof(pluginToRegister.GenerateMsgDetailed("Starting operationExecutor.RegisterPlugin", "")) klog.V(5).Infof(pluginToRegister.GenerateMsgDetailed("Starting operationExecutor.RegisterPlugin", ""))
err := rc.operationExecutor.RegisterPlugin(pluginToRegister.SocketPath, pluginToRegister.Timestamp, rc.getHandlers(), rc.getPathToHandlers(), rc.actualStateOfWorld) err := rc.operationExecutor.RegisterPlugin(pluginToRegister.SocketPath, pluginToRegister.Timestamp, rc.getHandlers(), rc.actualStateOfWorld)
if err != nil && if err != nil &&
!goroutinemap.IsAlreadyExists(err) && !goroutinemap.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) { !exponentialbackoff.IsExponentialBackoff(err) {