mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
Unregister csiplugin even if socket path is gone
Signed-off-by: Ted Yu <yuzhihong@gmail.com>
This commit is contained in:
parent
acd97b42f3
commit
c7bde41478
@ -75,6 +75,18 @@ type actualStateOfWorld struct {
|
||||
|
||||
var _ ActualStateOfWorld = &actualStateOfWorld{}
|
||||
|
||||
// NamedPluginHandler holds information for handler and the name of the plugin
|
||||
type NamedPluginHandler struct {
|
||||
Handler PluginHandler
|
||||
Name string
|
||||
}
|
||||
|
||||
// SocketPluginHandlers contains the map from socket path to NamedPluginHandler
|
||||
type SocketPluginHandlers struct {
|
||||
Handlers map[string]NamedPluginHandler
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
// PluginInfo holds information of a plugin
|
||||
type PluginInfo struct {
|
||||
SocketPath string
|
||||
|
@ -45,11 +45,11 @@ import (
|
||||
type OperationExecutor interface {
|
||||
// RegisterPlugin registers the given plugin using the a handler in the plugin handler map.
|
||||
// It then updates the actual state of the world to reflect that.
|
||||
RegisterPlugin(socketPath string, timestamp time.Time, pluginHandlers map[string]cache.PluginHandler, actualStateOfWorld ActualStateOfWorldUpdater) error
|
||||
RegisterPlugin(socketPath string, timestamp time.Time, pluginHandlers map[string]cache.PluginHandler, pathToHandlers *cache.SocketPluginHandlers, actualStateOfWorld ActualStateOfWorldUpdater) error
|
||||
|
||||
// UnregisterPlugin deregisters the given plugin using a handler in the given plugin handler map.
|
||||
// It then updates the actual state of the world to reflect that.
|
||||
UnregisterPlugin(socketPath string, pluginHandlers map[string]cache.PluginHandler, actualStateOfWorld ActualStateOfWorldUpdater) error
|
||||
UnregisterPlugin(socketPath string, pluginHandlers map[string]cache.PluginHandler, pathToHandlers *cache.SocketPluginHandlers, actualStateOfWorld ActualStateOfWorldUpdater) error
|
||||
}
|
||||
|
||||
// NewOperationExecutor returns a new instance of OperationExecutor.
|
||||
@ -96,9 +96,10 @@ func (oe *operationExecutor) RegisterPlugin(
|
||||
socketPath string,
|
||||
timestamp time.Time,
|
||||
pluginHandlers map[string]cache.PluginHandler,
|
||||
pathToHandlers *cache.SocketPluginHandlers,
|
||||
actualStateOfWorld ActualStateOfWorldUpdater) error {
|
||||
generatedOperation :=
|
||||
oe.operationGenerator.GenerateRegisterPluginFunc(socketPath, timestamp, pluginHandlers, actualStateOfWorld)
|
||||
oe.operationGenerator.GenerateRegisterPluginFunc(socketPath, timestamp, pluginHandlers, pathToHandlers, actualStateOfWorld)
|
||||
|
||||
return oe.pendingOperations.Run(
|
||||
socketPath, generatedOperation)
|
||||
@ -107,9 +108,10 @@ func (oe *operationExecutor) RegisterPlugin(
|
||||
func (oe *operationExecutor) UnregisterPlugin(
|
||||
socketPath string,
|
||||
pluginHandlers map[string]cache.PluginHandler,
|
||||
pathToHandlers *cache.SocketPluginHandlers,
|
||||
actualStateOfWorld ActualStateOfWorldUpdater) error {
|
||||
generatedOperation :=
|
||||
oe.operationGenerator.GenerateUnregisterPluginFunc(socketPath, pluginHandlers, actualStateOfWorld)
|
||||
oe.operationGenerator.GenerateUnregisterPluginFunc(socketPath, pluginHandlers, pathToHandlers, actualStateOfWorld)
|
||||
|
||||
return oe.pendingOperations.Run(
|
||||
socketPath, generatedOperation)
|
||||
|
@ -44,9 +44,10 @@ func init() {
|
||||
|
||||
func TestOperationExecutor_RegisterPlugin_ConcurrentRegisterPlugin(t *testing.T) {
|
||||
ch, quit, oe := setup()
|
||||
hdlr := cache.SocketPluginHandlers{}
|
||||
for i := 0; i < numPluginsToRegister; i++ {
|
||||
socketPath := fmt.Sprintf("%s/plugin-%d.sock", socketDir, i)
|
||||
oe.RegisterPlugin(socketPath, time.Now(), nil /* plugin handlers */, nil /* actual state of the world updator */)
|
||||
oe.RegisterPlugin(socketPath, time.Now(), nil /* plugin handlers */, &hdlr, nil /* actual state of the world updator */)
|
||||
}
|
||||
if !isOperationRunConcurrently(ch, quit, numPluginsToRegister) {
|
||||
t.Fatalf("Unable to start register operations in Concurrent for plugins")
|
||||
@ -56,8 +57,9 @@ func TestOperationExecutor_RegisterPlugin_ConcurrentRegisterPlugin(t *testing.T)
|
||||
func TestOperationExecutor_RegisterPlugin_SerialRegisterPlugin(t *testing.T) {
|
||||
ch, quit, oe := setup()
|
||||
socketPath := fmt.Sprintf("%s/plugin-serial.sock", socketDir)
|
||||
hdlr := cache.SocketPluginHandlers{}
|
||||
for i := 0; i < numPluginsToRegister; i++ {
|
||||
oe.RegisterPlugin(socketPath, time.Now(), nil /* plugin handlers */, nil /* actual state of the world updator */)
|
||||
oe.RegisterPlugin(socketPath, time.Now(), nil /* plugin handlers */, &hdlr, nil /* actual state of the world updator */)
|
||||
|
||||
}
|
||||
if !isOperationRunSerially(ch, quit) {
|
||||
@ -67,9 +69,10 @@ func TestOperationExecutor_RegisterPlugin_SerialRegisterPlugin(t *testing.T) {
|
||||
|
||||
func TestOperationExecutor_UnregisterPlugin_ConcurrentUnregisterPlugin(t *testing.T) {
|
||||
ch, quit, oe := setup()
|
||||
hdlr := cache.SocketPluginHandlers{}
|
||||
for i := 0; i < numPluginsToUnregister; i++ {
|
||||
socketPath := "socket-path" + strconv.Itoa(i)
|
||||
oe.UnregisterPlugin(socketPath, nil /* plugin handlers */, nil /* actual state of the world updator */)
|
||||
oe.UnregisterPlugin(socketPath, nil /* plugin handlers */, &hdlr, nil /* actual state of the world updator */)
|
||||
|
||||
}
|
||||
if !isOperationRunConcurrently(ch, quit, numPluginsToUnregister) {
|
||||
@ -80,8 +83,9 @@ func TestOperationExecutor_UnregisterPlugin_ConcurrentUnregisterPlugin(t *testin
|
||||
func TestOperationExecutor_UnregisterPlugin_SerialUnregisterPlugin(t *testing.T) {
|
||||
ch, quit, oe := setup()
|
||||
socketPath := fmt.Sprintf("%s/plugin-serial.sock", socketDir)
|
||||
hdlr := cache.SocketPluginHandlers{}
|
||||
for i := 0; i < numPluginsToUnregister; i++ {
|
||||
oe.UnregisterPlugin(socketPath, nil /* plugin handlers */, nil /* actual state of the world updator */)
|
||||
oe.UnregisterPlugin(socketPath, nil /* plugin handlers */, &hdlr, nil /* actual state of the world updator */)
|
||||
|
||||
}
|
||||
if !isOperationRunSerially(ch, quit) {
|
||||
@ -105,6 +109,7 @@ func (fopg *fakeOperationGenerator) GenerateRegisterPluginFunc(
|
||||
socketPath string,
|
||||
timestamp time.Time,
|
||||
pluginHandlers map[string]cache.PluginHandler,
|
||||
pathToHandlers *cache.SocketPluginHandlers,
|
||||
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
|
||||
|
||||
opFunc := func() error {
|
||||
@ -117,6 +122,7 @@ func (fopg *fakeOperationGenerator) GenerateRegisterPluginFunc(
|
||||
func (fopg *fakeOperationGenerator) GenerateUnregisterPluginFunc(
|
||||
socketPath string,
|
||||
pluginHandlers map[string]cache.PluginHandler,
|
||||
pathToHandlers *cache.SocketPluginHandlers,
|
||||
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
|
||||
opFunc := func() error {
|
||||
startOperationAndBlock(fopg.ch, fopg.quit)
|
||||
|
@ -63,12 +63,14 @@ type OperationGenerator interface {
|
||||
socketPath string,
|
||||
timestamp time.Time,
|
||||
pluginHandlers map[string]cache.PluginHandler,
|
||||
pathToHandlers *cache.SocketPluginHandlers,
|
||||
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error
|
||||
|
||||
// Generates the UnregisterPlugin function needed to perform the unregistration of a plugin
|
||||
GenerateUnregisterPluginFunc(
|
||||
socketPath string,
|
||||
pluginHandlers map[string]cache.PluginHandler,
|
||||
pathToHandlers *cache.SocketPluginHandlers,
|
||||
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error
|
||||
}
|
||||
|
||||
@ -76,6 +78,7 @@ func (og *operationGenerator) GenerateRegisterPluginFunc(
|
||||
socketPath string,
|
||||
timestamp time.Time,
|
||||
pluginHandlers map[string]cache.PluginHandler,
|
||||
pathToHandlers *cache.SocketPluginHandlers,
|
||||
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
|
||||
|
||||
registerPluginFunc := func() error {
|
||||
@ -122,6 +125,12 @@ func (og *operationGenerator) GenerateRegisterPluginFunc(
|
||||
if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil {
|
||||
return og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin registration failed with err: %v", err))
|
||||
}
|
||||
pathToHandlers.Lock()
|
||||
if pathToHandlers.Handlers == nil {
|
||||
pathToHandlers.Handlers = make(map[string]cache.NamedPluginHandler)
|
||||
}
|
||||
pathToHandlers.Handlers[socketPath] = cache.NamedPluginHandler{Handler: handler, Name: infoResp.Name}
|
||||
pathToHandlers.Unlock()
|
||||
|
||||
// Notify is called after register to guarantee that even if notify throws an error Register will always be called after validate
|
||||
if err := og.notifyPlugin(client, true, ""); err != nil {
|
||||
@ -135,33 +144,35 @@ func (og *operationGenerator) GenerateRegisterPluginFunc(
|
||||
func (og *operationGenerator) GenerateUnregisterPluginFunc(
|
||||
socketPath string,
|
||||
pluginHandlers map[string]cache.PluginHandler,
|
||||
pathToHandlers *cache.SocketPluginHandlers,
|
||||
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
|
||||
|
||||
unregisterPluginFunc := func() error {
|
||||
client, conn, err := dial(socketPath, dialTimeoutDuration)
|
||||
_, conn, err := dial(socketPath, dialTimeoutDuration)
|
||||
if err != nil {
|
||||
return fmt.Errorf("UnregisterPlugin error -- dial failed at socket %s, err: %v", socketPath, err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
defer cancel()
|
||||
|
||||
infoResp, err := client.GetInfo(ctx, ®isterapi.InfoRequest{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("UnregisterPlugin error -- failed to get plugin info using RPC GetInfo at socket %s, err: %v", socketPath, err)
|
||||
klog.V(4).Infof("unable to dial: %v", err)
|
||||
} else {
|
||||
conn.Close()
|
||||
}
|
||||
|
||||
handler, ok := pluginHandlers[infoResp.Type]
|
||||
if !ok {
|
||||
return fmt.Errorf("UnregisterPlugin error -- no handler registered for plugin type: %s at socket %s", infoResp.Type, socketPath)
|
||||
}
|
||||
var handlerWithName cache.NamedPluginHandler
|
||||
pathToHandlers.Lock()
|
||||
handlerWithName, handlerFound := pathToHandlers.Handlers[socketPath]
|
||||
pathToHandlers.Unlock()
|
||||
|
||||
if !handlerFound {
|
||||
return fmt.Errorf("UnregisterPlugin error -- failed to get plugin handler for %s", socketPath)
|
||||
}
|
||||
// We remove the plugin to the actual state of world cache before calling a plugin consumer's Unregister handle
|
||||
// so that if we receive a register event during Register Plugin, we can process it as a Register call.
|
||||
actualStateOfWorldUpdater.RemovePlugin(socketPath)
|
||||
|
||||
handler.DeRegisterPlugin(infoResp.Name)
|
||||
handlerWithName.Handler.DeRegisterPlugin(handlerWithName.Name)
|
||||
|
||||
pathToHandlers.Lock()
|
||||
delete(pathToHandlers.Handlers, socketPath)
|
||||
pathToHandlers.Unlock()
|
||||
klog.V(4).Infof("DeRegisterPlugin called for %s on %v", handlerWithName.Name, handlerWithName.Handler)
|
||||
return nil
|
||||
}
|
||||
return unregisterPluginFunc
|
||||
|
@ -67,6 +67,7 @@ func NewReconciler(
|
||||
desiredStateOfWorld: desiredStateOfWorld,
|
||||
actualStateOfWorld: actualStateOfWorld,
|
||||
handlers: make(map[string]cache.PluginHandler),
|
||||
pathToHandlers: cache.SocketPluginHandlers{Handlers: make(map[string]cache.NamedPluginHandler)},
|
||||
}
|
||||
}
|
||||
|
||||
@ -76,6 +77,7 @@ type reconciler struct {
|
||||
desiredStateOfWorld cache.DesiredStateOfWorld
|
||||
actualStateOfWorld cache.ActualStateOfWorld
|
||||
handlers map[string]cache.PluginHandler
|
||||
pathToHandlers cache.SocketPluginHandlers
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
@ -103,6 +105,13 @@ func (rc *reconciler) getHandlers() map[string]cache.PluginHandler {
|
||||
return rc.handlers
|
||||
}
|
||||
|
||||
func (rc *reconciler) getPathToHandlers() *cache.SocketPluginHandlers {
|
||||
rc.RLock()
|
||||
defer rc.RUnlock()
|
||||
|
||||
return &rc.pathToHandlers
|
||||
}
|
||||
|
||||
func (rc *reconciler) reconcile() {
|
||||
// Unregisterations are triggered before registrations
|
||||
|
||||
@ -127,7 +136,7 @@ func (rc *reconciler) reconcile() {
|
||||
|
||||
if unregisterPlugin {
|
||||
klog.V(5).Infof(registeredPlugin.GenerateMsgDetailed("Starting operationExecutor.UnregisterPlugin", ""))
|
||||
err := rc.operationExecutor.UnregisterPlugin(registeredPlugin.SocketPath, rc.getHandlers(), rc.actualStateOfWorld)
|
||||
err := rc.operationExecutor.UnregisterPlugin(registeredPlugin.SocketPath, rc.getHandlers(), rc.getPathToHandlers(), rc.actualStateOfWorld)
|
||||
if err != nil &&
|
||||
!goroutinemap.IsAlreadyExists(err) &&
|
||||
!exponentialbackoff.IsExponentialBackoff(err) {
|
||||
@ -145,7 +154,7 @@ func (rc *reconciler) reconcile() {
|
||||
for _, pluginToRegister := range rc.desiredStateOfWorld.GetPluginsToRegister() {
|
||||
if !rc.actualStateOfWorld.PluginExistsWithCorrectTimestamp(pluginToRegister) {
|
||||
klog.V(5).Infof(pluginToRegister.GenerateMsgDetailed("Starting operationExecutor.RegisterPlugin", ""))
|
||||
err := rc.operationExecutor.RegisterPlugin(pluginToRegister.SocketPath, pluginToRegister.Timestamp, rc.getHandlers(), rc.actualStateOfWorld)
|
||||
err := rc.operationExecutor.RegisterPlugin(pluginToRegister.SocketPath, pluginToRegister.Timestamp, rc.getHandlers(), rc.getPathToHandlers(), rc.actualStateOfWorld)
|
||||
if err != nil &&
|
||||
!goroutinemap.IsAlreadyExists(err) &&
|
||||
!exponentialbackoff.IsExponentialBackoff(err) {
|
||||
|
@ -252,6 +252,7 @@ func Test_Run_Positive_RegisterThenUnregister(t *testing.T) {
|
||||
}
|
||||
|
||||
dsw.RemovePlugin(socketPath)
|
||||
os.Remove(socketPath)
|
||||
waitForUnregistration(t, socketPath, asw)
|
||||
|
||||
// Get asw plugins; it should no longer contain the added plugin
|
||||
|
Loading…
Reference in New Issue
Block a user