mirror of
https://github.com/k8snetworkplumbingwg/multus-cni.git
synced 2025-08-13 22:07:50 +00:00
Support readinessIndicator file in thick multus-daemon
This change supports readinessIndicatorfile in multus-daemon and refines goroutine termination in case of signal with context.
This commit is contained in:
parent
bf79dc3269
commit
41d5d08686
@ -23,8 +23,11 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
|
"os/signal"
|
||||||
"os/user"
|
"os/user"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
@ -35,6 +38,7 @@ import (
|
|||||||
srv "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/server"
|
srv "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/server"
|
||||||
"gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/server/api"
|
"gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/server/api"
|
||||||
"gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/server/config"
|
"gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/server/config"
|
||||||
|
"gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/types"
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||||
)
|
)
|
||||||
@ -54,32 +58,47 @@ func main() {
|
|||||||
os.Exit(4)
|
os.Exit(4)
|
||||||
}
|
}
|
||||||
|
|
||||||
configWatcherStopChannel := make(chan struct{})
|
|
||||||
configWatcherDoneChannel := make(chan struct{})
|
configWatcherDoneChannel := make(chan struct{})
|
||||||
serverStopChannel := make(chan struct{})
|
|
||||||
serverDoneChannel := make(chan struct{})
|
serverDoneChannel := make(chan struct{})
|
||||||
|
multusConfigFile := ""
|
||||||
|
ctx := context.Background()
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
|
||||||
daemonConf, err := cniServerConfig(*configFilePath)
|
daemonConf, err := cniServerConfig(*configFilePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := startMultusDaemon(daemonConf, serverStopChannel, serverDoneChannel); err != nil {
|
|
||||||
logging.Panicf("failed start the multus thick-plugin listener: %v", err)
|
|
||||||
os.Exit(3)
|
|
||||||
}
|
|
||||||
|
|
||||||
multusConf, err := config.ParseMultusConfig(*configFilePath)
|
multusConf, err := config.ParseMultusConfig(*configFilePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logging.Panicf("startMultusDaemon failed to load the multus configuration: %v", err)
|
logging.Panicf("startMultusDaemon failed to load the multus configuration: %v", err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logging.Verbosef("multus-daemon started")
|
||||||
|
|
||||||
|
if multusConf.ReadinessIndicatorFile != "" {
|
||||||
|
// Check readinessindicator file before daemon launch
|
||||||
|
logging.Verbosef("Readiness Indicator file check")
|
||||||
|
if err := types.GetReadinessIndicatorFile(multusConf.ReadinessIndicatorFile); err != nil {
|
||||||
|
_ = logging.Errorf("have you checked that your default network is ready? still waiting for readinessindicatorfile @ %v. pollimmediate error: %v", multusConf.ReadinessIndicatorFile, err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
logging.Verbosef("Readiness Indicator file check done!")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := startMultusDaemon(ctx, daemonConf, serverDoneChannel); err != nil {
|
||||||
|
logging.Panicf("failed start the multus thick-plugin listener: %v", err)
|
||||||
|
os.Exit(3)
|
||||||
|
}
|
||||||
|
|
||||||
// Wait until daemon ready
|
// Wait until daemon ready
|
||||||
|
logging.Verbosef("API readiness check")
|
||||||
if waitUntilAPIReady(daemonConf.SocketDir) != nil {
|
if waitUntilAPIReady(daemonConf.SocketDir) != nil {
|
||||||
logging.Panicf("failed to ready multus-daemon socket: %v", err)
|
logging.Panicf("failed to ready multus-daemon socket: %v", err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
logging.Verbosef("API readiness check done!")
|
||||||
|
|
||||||
// Generate multus CNI config from current CNI config
|
// Generate multus CNI config from current CNI config
|
||||||
if multusConf.MultusConfigFile == "auto" {
|
if multusConf.MultusConfigFile == "auto" {
|
||||||
@ -111,39 +130,51 @@ func main() {
|
|||||||
}
|
}
|
||||||
logging.Verbosef("Generated MultusCNI config: %s", generatedMultusConfig)
|
logging.Verbosef("Generated MultusCNI config: %s", generatedMultusConfig)
|
||||||
|
|
||||||
if err := configManager.PersistMultusConfig(generatedMultusConfig); err != nil {
|
multusConfigFile, err = configManager.PersistMultusConfig(generatedMultusConfig)
|
||||||
|
if err != nil {
|
||||||
_ = logging.Errorf("failed to persist the multus configuration: %v", err)
|
_ = logging.Errorf("failed to persist the multus configuration: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
go func(stopChannel chan<- struct{}, doneChannel chan<- struct{}) {
|
go func(ctx context.Context, doneChannel chan<- struct{}) {
|
||||||
if err := configManager.MonitorPluginConfiguration(configWatcherStopChannel, doneChannel); err != nil {
|
if err := configManager.MonitorPluginConfiguration(ctx, doneChannel); err != nil {
|
||||||
_ = logging.Errorf("error watching file: %v", err)
|
_ = logging.Errorf("error watching file: %v", err)
|
||||||
}
|
}
|
||||||
}(configWatcherStopChannel, configWatcherDoneChannel)
|
}(ctx, configWatcherDoneChannel)
|
||||||
|
|
||||||
<-configWatcherDoneChannel
|
|
||||||
} else {
|
} else {
|
||||||
if err := copyUserProvidedConfig(multusConf.MultusConfigFile, multusConf.CniConfigDir); err != nil {
|
if err := copyUserProvidedConfig(multusConf.MultusConfigFile, multusConf.CniConfigDir); err != nil {
|
||||||
logging.Errorf("failed to copy the user provided configuration %s: %v", multusConf.MultusConfigFile, err)
|
logging.Errorf("failed to copy the user provided configuration %s: %v", multusConf.MultusConfigFile, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
serverDone := false
|
signalCh := make(chan os.Signal, 16)
|
||||||
configWatcherDone := false
|
signal.Notify(signalCh, syscall.SIGINT, syscall.SIGTERM)
|
||||||
for {
|
go func() {
|
||||||
select {
|
for sig := range signalCh {
|
||||||
case <-configWatcherDoneChannel:
|
logging.Verbosef("caught %v, stopping...", sig)
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
if multusConf.MultusConfigFile == "auto" {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
<-configWatcherDoneChannel
|
||||||
logging.Verbosef("ConfigWatcher done")
|
logging.Verbosef("ConfigWatcher done")
|
||||||
configWatcherDone = true
|
logging.Verbosef("Delete old config @ %v", multusConfigFile)
|
||||||
case <-serverDoneChannel:
|
os.Remove(multusConfigFile)
|
||||||
logging.Verbosef("multus-server done.")
|
wg.Done()
|
||||||
serverDone = true
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
if serverDone && configWatcherDone {
|
wg.Add(1)
|
||||||
return
|
go func() {
|
||||||
}
|
<-serverDoneChannel
|
||||||
}
|
logging.Verbosef("multus-server done.")
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
// never reached
|
// never reached
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -157,7 +188,7 @@ func waitUntilAPIReady(socketPath string) error {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func startMultusDaemon(daemonConfig *srv.ControllerNetConf, stopCh chan struct{}, done chan struct{}) error {
|
func startMultusDaemon(ctx context.Context, daemonConfig *srv.ControllerNetConf, done chan struct{}) error {
|
||||||
if user, err := user.Current(); err != nil || user.Uid != "0" {
|
if user, err := user.Current(); err != nil || user.Uid != "0" {
|
||||||
return fmt.Errorf("failed to run multus-daemon with root: %v, now running in uid: %s", err, user.Uid)
|
return fmt.Errorf("failed to run multus-daemon with root: %v, now running in uid: %s", err, user.Uid)
|
||||||
}
|
}
|
||||||
@ -172,11 +203,11 @@ func startMultusDaemon(daemonConfig *srv.ControllerNetConf, stopCh chan struct{}
|
|||||||
}
|
}
|
||||||
|
|
||||||
if daemonConfig.MetricsPort != nil {
|
if daemonConfig.MetricsPort != nil {
|
||||||
go utilwait.Until(func() {
|
go utilwait.UntilWithContext(ctx, func(ctx context.Context) {
|
||||||
http.Handle("/metrics", promhttp.Handler())
|
http.Handle("/metrics", promhttp.Handler())
|
||||||
logging.Debugf("metrics port: %d", *daemonConfig.MetricsPort)
|
logging.Debugf("metrics port: %d", *daemonConfig.MetricsPort)
|
||||||
logging.Debugf("metrics: %s", http.ListenAndServe(fmt.Sprintf(":%d", *daemonConfig.MetricsPort), nil))
|
logging.Debugf("metrics: %s", http.ListenAndServe(fmt.Sprintf(":%d", *daemonConfig.MetricsPort), nil))
|
||||||
}, 0, stopCh)
|
}, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
l, err := srv.GetListener(api.SocketPath(daemonConfig.SocketDir))
|
l, err := srv.GetListener(api.SocketPath(daemonConfig.SocketDir))
|
||||||
@ -186,13 +217,13 @@ func startMultusDaemon(daemonConfig *srv.ControllerNetConf, stopCh chan struct{}
|
|||||||
|
|
||||||
server.SetKeepAlivesEnabled(false)
|
server.SetKeepAlivesEnabled(false)
|
||||||
go func() {
|
go func() {
|
||||||
utilwait.Until(func() {
|
utilwait.UntilWithContext(ctx, func(ctx context.Context) {
|
||||||
logging.Debugf("open for business")
|
logging.Debugf("open for business")
|
||||||
if err := server.Serve(l); err != nil {
|
if err := server.Serve(l); err != nil {
|
||||||
utilruntime.HandleError(fmt.Errorf("CNI server Serve() failed: %v", err))
|
utilruntime.HandleError(fmt.Errorf("CNI server Serve() failed: %v", err))
|
||||||
}
|
}
|
||||||
}, 0, stopCh)
|
}, 0)
|
||||||
server.Shutdown(context.TODO())
|
server.Shutdown(context.Background())
|
||||||
close(done)
|
close(done)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
@ -58,11 +58,6 @@ var (
|
|||||||
releaseStatus = ""
|
releaseStatus = ""
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
|
||||||
pollDuration = 1000 * time.Millisecond
|
|
||||||
pollTimeout = 45 * time.Second
|
|
||||||
)
|
|
||||||
|
|
||||||
// PrintVersionString ...
|
// PrintVersionString ...
|
||||||
func PrintVersionString() string {
|
func PrintVersionString() string {
|
||||||
return fmt.Sprintf("version:%s(%s%s), commit:%s, date:%s", version, gitTreeState, releaseStatus, commit, date)
|
return fmt.Sprintf("version:%s(%s%s), commit:%s, date:%s", version, gitTreeState, releaseStatus, commit, date)
|
||||||
@ -575,11 +570,7 @@ func CmdAdd(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) (c
|
|||||||
}
|
}
|
||||||
|
|
||||||
if n.ReadinessIndicatorFile != "" {
|
if n.ReadinessIndicatorFile != "" {
|
||||||
err := wait.PollImmediate(pollDuration, pollTimeout, func() (bool, error) {
|
if err := types.GetReadinessIndicatorFile(n.ReadinessIndicatorFile); err != nil {
|
||||||
_, err := os.Stat(n.ReadinessIndicatorFile)
|
|
||||||
return err == nil, nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, cmdErr(k8sArgs, "have you checked that your default network is ready? still waiting for readinessindicatorfile @ %v. pollimmediate error: %v", n.ReadinessIndicatorFile, err)
|
return nil, cmdErr(k8sArgs, "have you checked that your default network is ready? still waiting for readinessindicatorfile @ %v. pollimmediate error: %v", n.ReadinessIndicatorFile, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -813,11 +804,7 @@ func CmdDel(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) er
|
|||||||
}
|
}
|
||||||
|
|
||||||
if in.ReadinessIndicatorFile != "" {
|
if in.ReadinessIndicatorFile != "" {
|
||||||
err := wait.PollImmediate(pollDuration, pollTimeout, func() (bool, error) {
|
if err := types.GetReadinessIndicatorFile(in.ReadinessIndicatorFile); err != nil {
|
||||||
_, err := os.Stat(in.ReadinessIndicatorFile)
|
|
||||||
return err == nil, nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return cmdErr(k8sArgs, "PollImmediate error waiting for ReadinessIndicatorFile (on del): %v", err)
|
return cmdErr(k8sArgs, "PollImmediate error waiting for ReadinessIndicatorFile (on del): %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -15,9 +15,11 @@
|
|||||||
package config
|
package config
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
|
||||||
"github.com/fsnotify/fsnotify"
|
"github.com/fsnotify/fsnotify"
|
||||||
|
|
||||||
@ -39,6 +41,7 @@ type Manager struct {
|
|||||||
multusConfig *MultusConf
|
multusConfig *MultusConf
|
||||||
multusConfigDir string
|
multusConfigDir string
|
||||||
multusConfigFilePath string
|
multusConfigFilePath string
|
||||||
|
readinessIndicatorFilePath string
|
||||||
primaryCNIConfigPath string
|
primaryCNIConfigPath string
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -95,7 +98,12 @@ func newManager(config MultusConf, multusConfigDir, defaultCNIPluginName string,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
watcher, err := newWatcher(multusConfigDir)
|
readinessIndicatorPath := ""
|
||||||
|
if config.ReadinessIndicatorFile != "" {
|
||||||
|
readinessIndicatorPath = filepath.Dir(config.ReadinessIndicatorFile)
|
||||||
|
}
|
||||||
|
|
||||||
|
watcher, err := newWatcher(multusConfigDir, readinessIndicatorPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -110,6 +118,7 @@ func newManager(config MultusConf, multusConfigDir, defaultCNIPluginName string,
|
|||||||
multusConfigDir: multusConfigDir,
|
multusConfigDir: multusConfigDir,
|
||||||
multusConfigFilePath: cniPluginConfigFilePath(config.CniConfigDir, multusConfigFileName),
|
multusConfigFilePath: cniPluginConfigFilePath(config.CniConfigDir, multusConfigFileName),
|
||||||
primaryCNIConfigPath: cniPluginConfigFilePath(multusConfigDir, defaultCNIPluginName),
|
primaryCNIConfigPath: cniPluginConfigFilePath(multusConfigDir, defaultCNIPluginName),
|
||||||
|
readinessIndicatorFilePath: config.ReadinessIndicatorFile,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := configManager.loadPrimaryCNIConfigFromFile(); err != nil {
|
if err := configManager.loadPrimaryCNIConfigFromFile(); err != nil {
|
||||||
@ -157,7 +166,7 @@ func (m *Manager) loadPrimaryCNIConfigurationData(primaryCNIConfigData interface
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GenerateConfig generates a multus configuration from its current state
|
// GenerateConfig generates a multus configuration from its current state
|
||||||
func (m Manager) GenerateConfig() (string, error) {
|
func (m *Manager) GenerateConfig() (string, error) {
|
||||||
if err := m.loadPrimaryCNIConfigFromFile(); err != nil {
|
if err := m.loadPrimaryCNIConfigFromFile(); err != nil {
|
||||||
_ = logging.Errorf("failed to read the primary CNI plugin config from %s", m.primaryCNIConfigPath)
|
_ = logging.Errorf("failed to read the primary CNI plugin config from %s", m.primaryCNIConfigPath)
|
||||||
return "", nil
|
return "", nil
|
||||||
@ -168,22 +177,22 @@ func (m Manager) GenerateConfig() (string, error) {
|
|||||||
// MonitorPluginConfiguration monitors the configuration file pointed
|
// MonitorPluginConfiguration monitors the configuration file pointed
|
||||||
// to by the primaryCNIPluginName attribute, and re-generates the multus
|
// to by the primaryCNIPluginName attribute, and re-generates the multus
|
||||||
// configuration whenever the primary CNI config is updated.
|
// configuration whenever the primary CNI config is updated.
|
||||||
func (m Manager) MonitorPluginConfiguration(shutDown <-chan struct{}, done chan<- struct{}) error {
|
func (m *Manager) MonitorPluginConfiguration(ctx context.Context, done chan<- struct{}) error {
|
||||||
logging.Verbosef("started to watch file %s", m.primaryCNIConfigPath)
|
logging.Verbosef("started to watch file %s", m.primaryCNIConfigPath)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case event := <-m.configWatcher.Events:
|
case event := <-m.configWatcher.Events:
|
||||||
// we're watching the DIR where the config sits, and the event
|
if !m.shouldRegenerateConfig(event) {
|
||||||
// does not concern the primary CNI config. Skip it.
|
|
||||||
if event.Name != m.primaryCNIConfigPath {
|
|
||||||
logging.Debugf("skipping un-related event %v", event)
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
logging.Debugf("process event: %v", event)
|
logging.Debugf("process event: %v", event)
|
||||||
|
|
||||||
if !shouldRegenerateConfig(event) {
|
// if readinessIndicatorFile is removed, then restart multus
|
||||||
continue
|
if m.readinessIndicatorFilePath != "" && m.readinessIndicatorFilePath == event.Name {
|
||||||
|
logging.Verbosef("readiness indicator file is gone. restart multus-daemon")
|
||||||
|
os.Remove(m.multusConfigFilePath)
|
||||||
|
os.Exit(2)
|
||||||
}
|
}
|
||||||
|
|
||||||
updatedConfig, err := m.GenerateConfig()
|
updatedConfig, err := m.GenerateConfig()
|
||||||
@ -192,7 +201,7 @@ func (m Manager) MonitorPluginConfiguration(shutDown <-chan struct{}, done chan<
|
|||||||
}
|
}
|
||||||
|
|
||||||
logging.Debugf("Re-generated MultusCNI config: %s", updatedConfig)
|
logging.Debugf("Re-generated MultusCNI config: %s", updatedConfig)
|
||||||
if err := m.PersistMultusConfig(updatedConfig); err != nil {
|
if _, err := m.PersistMultusConfig(updatedConfig); err != nil {
|
||||||
_ = logging.Errorf("failed to persist the multus configuration: %v", err)
|
_ = logging.Errorf("failed to persist the multus configuration: %v", err)
|
||||||
}
|
}
|
||||||
if err := m.loadPrimaryCNIConfigFromFile(); err != nil {
|
if err := m.loadPrimaryCNIConfigFromFile(); err != nil {
|
||||||
@ -205,10 +214,9 @@ func (m Manager) MonitorPluginConfiguration(shutDown <-chan struct{}, done chan<
|
|||||||
}
|
}
|
||||||
logging.Errorf("CNI monitoring error %v", err)
|
logging.Errorf("CNI monitoring error %v", err)
|
||||||
|
|
||||||
case <-shutDown:
|
case <-ctx.Done():
|
||||||
logging.Verbosef("Stopped monitoring, closing channel ...")
|
logging.Verbosef("Stopped monitoring, closing channel ...")
|
||||||
_ = m.configWatcher.Close()
|
_ = m.configWatcher.Close()
|
||||||
close(done)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -216,9 +224,24 @@ func (m Manager) MonitorPluginConfiguration(shutDown <-chan struct{}, done chan<
|
|||||||
|
|
||||||
// PersistMultusConfig persists the provided configuration to the disc, with
|
// PersistMultusConfig persists the provided configuration to the disc, with
|
||||||
// Read / Write permissions. The output file path is `<multus auto config dir>/00-multus.conf`
|
// Read / Write permissions. The output file path is `<multus auto config dir>/00-multus.conf`
|
||||||
func (m Manager) PersistMultusConfig(config string) error {
|
func (m *Manager) PersistMultusConfig(config string) (string, error) {
|
||||||
logging.Debugf("Writing Multus CNI configuration @ %s", m.multusConfigFilePath)
|
logging.Debugf("Writing Multus CNI configuration @ %s", m.multusConfigFilePath)
|
||||||
return os.WriteFile(m.multusConfigFilePath, []byte(config), UserRWPermission)
|
return m.multusConfigFilePath, os.WriteFile(m.multusConfigFilePath, []byte(config), UserRWPermission)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) shouldRegenerateConfig(event fsnotify.Event) bool {
|
||||||
|
// first, check the readiness indicator file existence
|
||||||
|
if event.Name == m.readinessIndicatorFilePath {
|
||||||
|
return event.Has(fsnotify.Remove) || event.Has(fsnotify.Rename)
|
||||||
|
}
|
||||||
|
|
||||||
|
// we're watching the DIR where the config sits, and the event
|
||||||
|
// does not concern the primary CNI config. Skip it.
|
||||||
|
if event.Name == m.primaryCNIConfigPath {
|
||||||
|
return event.Has(fsnotify.Write) || event.Has(fsnotify.Create)
|
||||||
|
}
|
||||||
|
logging.Debugf("skipping un-related event %v", event)
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func getPrimaryCNIPluginName(multusAutoconfigDir string) (string, error) {
|
func getPrimaryCNIPluginName(multusAutoconfigDir string) (string, error) {
|
||||||
@ -233,7 +256,7 @@ func cniPluginConfigFilePath(cniConfigDir string, cniConfigFileName string) stri
|
|||||||
return cniConfigDir + fmt.Sprintf("/%s", cniConfigFileName)
|
return cniConfigDir + fmt.Sprintf("/%s", cniConfigFileName)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newWatcher(cniConfigDir string) (*fsnotify.Watcher, error) {
|
func newWatcher(cniConfigDir string, readinessIndicatorDir string) (*fsnotify.Watcher, error) {
|
||||||
watcher, err := fsnotify.NewWatcher()
|
watcher, err := fsnotify.NewWatcher()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create new watcher for %q: %v", cniConfigDir, err)
|
return nil, fmt.Errorf("failed to create new watcher for %q: %v", cniConfigDir, err)
|
||||||
@ -246,16 +269,18 @@ func newWatcher(cniConfigDir string) (*fsnotify.Watcher, error) {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
if err = watcher.Add(cniConfigDir); err != nil {
|
if err = watcher.Add(cniConfigDir); err != nil {
|
||||||
return nil, fmt.Errorf("failed to add watch on %q: %v", cniConfigDir, err)
|
return nil, fmt.Errorf("failed to add watch on %q for cni config: %v", cniConfigDir, err)
|
||||||
|
}
|
||||||
|
// if readinessIndicatorDir is different from cniConfigDir,
|
||||||
|
if readinessIndicatorDir != "" && cniConfigDir != readinessIndicatorDir {
|
||||||
|
if err = watcher.Add(readinessIndicatorDir); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to add watch on %q for readinessIndicator: %v", readinessIndicatorDir, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return watcher, nil
|
return watcher, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func shouldRegenerateConfig(event fsnotify.Event) bool {
|
|
||||||
return event.Has(fsnotify.Write) || event.Has(fsnotify.Create)
|
|
||||||
}
|
|
||||||
|
|
||||||
func primaryCNIData(masterCNIPluginPath string) (interface{}, error) {
|
func primaryCNIData(masterCNIPluginPath string) (interface{}, error) {
|
||||||
masterCNIConfigData, err := os.ReadFile(masterCNIPluginPath)
|
masterCNIConfigData, err := os.ReadFile(masterCNIPluginPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -15,6 +15,7 @@
|
|||||||
package config
|
package config
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
@ -98,14 +99,15 @@ var _ = Describe("Configuration Manager", func() {
|
|||||||
config, err := configManager.GenerateConfig()
|
config, err := configManager.GenerateConfig()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
err = configManager.PersistMultusConfig(config)
|
_, err = configManager.PersistMultusConfig(config)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
configWatcherDoneChannel := make(chan struct{})
|
configWatcherDoneChannel := make(chan struct{})
|
||||||
go func(stopChannel chan struct{}, doneChannel chan struct{}) {
|
go func(ctx context.Context, doneChannel chan struct{}) {
|
||||||
err := configManager.MonitorPluginConfiguration(configWatcherDoneChannel, stopChannel)
|
err := configManager.MonitorPluginConfiguration(ctx, doneChannel)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
}(make(chan struct{}), configWatcherDoneChannel)
|
}(ctx, configWatcherDoneChannel)
|
||||||
|
|
||||||
updatedCNIConfig := `
|
updatedCNIConfig := `
|
||||||
{
|
{
|
||||||
@ -126,7 +128,7 @@ var _ = Describe("Configuration Manager", func() {
|
|||||||
Expect(string(file)).To(Equal(config))
|
Expect(string(file)).To(Equal(config))
|
||||||
|
|
||||||
// stop groutine
|
// stop groutine
|
||||||
configWatcherDoneChannel <- struct{}{}
|
cancel()
|
||||||
})
|
})
|
||||||
|
|
||||||
When("the user requests the name of the multus configuration to be overridden", func() {
|
When("the user requests the name of the multus configuration to be overridden", func() {
|
||||||
|
@ -57,13 +57,18 @@ func FilesystemPreRequirements(rundir string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func printCmdArgs(args *skel.CmdArgs) string {
|
||||||
|
return fmt.Sprintf("ContainerID:%q Netns:%q IfName:%q Args:%q Path:%q",
|
||||||
|
args.ContainerID, args.Netns, args.IfName, args.Args, args.Path)
|
||||||
|
}
|
||||||
|
|
||||||
// HandleCNIRequest is the CNI server handler function; it is invoked whenever
|
// HandleCNIRequest is the CNI server handler function; it is invoked whenever
|
||||||
// a CNI request is processed.
|
// a CNI request is processed.
|
||||||
func (s *Server) HandleCNIRequest(cmd string, k8sArgs *types.K8sArgs, cniCmdArgs *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) ([]byte, error) {
|
func (s *Server) HandleCNIRequest(cmd string, k8sArgs *types.K8sArgs, cniCmdArgs *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) ([]byte, error) {
|
||||||
var result []byte
|
var result []byte
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
logging.Verbosef("%s starting CNI request %+v", cmd, cniCmdArgs)
|
logging.Verbosef("%s starting CNI request %s", cmd, printCmdArgs(cniCmdArgs))
|
||||||
switch cmd {
|
switch cmd {
|
||||||
case "ADD":
|
case "ADD":
|
||||||
result, err = cmdAdd(cniCmdArgs, k8sArgs, exec, kubeClient)
|
result, err = cmdAdd(cniCmdArgs, k8sArgs, exec, kubeClient)
|
||||||
@ -74,10 +79,10 @@ func (s *Server) HandleCNIRequest(cmd string, k8sArgs *types.K8sArgs, cniCmdArgs
|
|||||||
default:
|
default:
|
||||||
return []byte(""), fmt.Errorf("unknown cmd type: %s", cmd)
|
return []byte(""), fmt.Errorf("unknown cmd type: %s", cmd)
|
||||||
}
|
}
|
||||||
logging.Verbosef("%s finished CNI request %+v, result: %q, err: %v", cmd, *cniCmdArgs, string(result), err)
|
logging.Verbosef("%s finished CNI request %s, result: %q, err: %v", cmd, printCmdArgs(cniCmdArgs), string(result), err)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Prefix errors with request info for easier failure debugging
|
// Prefix errors with request info for easier failure debugging
|
||||||
return nil, fmt.Errorf("%+v ERRORED: %v", *cniCmdArgs, err)
|
return nil, fmt.Errorf("%s ERRORED: %v", printCmdArgs(cniCmdArgs), err)
|
||||||
}
|
}
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
@ -95,7 +100,7 @@ func (s *Server) HandleDelegateRequest(cmd string, k8sArgs *types.K8sArgs, cniCm
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
logging.Verbosef("%s starting delegate request %+v", cmd, cniCmdArgs)
|
logging.Verbosef("%s starting delegate request %s", cmd, printCmdArgs(cniCmdArgs))
|
||||||
switch cmd {
|
switch cmd {
|
||||||
case "ADD":
|
case "ADD":
|
||||||
result, err = cmdDelegateAdd(cniCmdArgs, k8sArgs, exec, kubeClient, multusConfig, interfaceAttributes)
|
result, err = cmdDelegateAdd(cniCmdArgs, k8sArgs, exec, kubeClient, multusConfig, interfaceAttributes)
|
||||||
@ -106,10 +111,10 @@ func (s *Server) HandleDelegateRequest(cmd string, k8sArgs *types.K8sArgs, cniCm
|
|||||||
default:
|
default:
|
||||||
return []byte(""), fmt.Errorf("unknown cmd type: %s", cmd)
|
return []byte(""), fmt.Errorf("unknown cmd type: %s", cmd)
|
||||||
}
|
}
|
||||||
logging.Verbosef("%s finished Delegate request %+v, result: %q, err: %v", cmd, *cniCmdArgs, string(result), err)
|
logging.Verbosef("%s finished Delegate request %s, result: %q, err: %v", cmd, printCmdArgs(cniCmdArgs), string(result), err)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Prefix errors with request info for easier failure debugging
|
// Prefix errors with request info for easier failure debugging
|
||||||
return nil, fmt.Errorf("%+v ERRORED: %v", *cniCmdArgs, err)
|
return nil, fmt.Errorf("%s ERRORED: %v", printCmdArgs(cniCmdArgs), err)
|
||||||
}
|
}
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
@ -293,7 +298,7 @@ func (s *Server) handleDelegateRequest(r *http.Request) ([]byte, error) {
|
|||||||
result, err := s.HandleDelegateRequest(cmdType, k8sArgs, cniCmdArgs, s.exec, s.kubeclient, cr.InterfaceAttributes)
|
result, err := s.HandleDelegateRequest(cmdType, k8sArgs, cniCmdArgs, s.exec, s.kubeclient, cr.InterfaceAttributes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Prefix error with request information for easier debugging
|
// Prefix error with request information for easier debugging
|
||||||
return nil, fmt.Errorf("%+v %v", cniCmdArgs, err)
|
return nil, fmt.Errorf("%s %v", printCmdArgs(cniCmdArgs), err)
|
||||||
}
|
}
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
@ -22,6 +22,9 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
utilwait "k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
|
||||||
"github.com/containernetworking/cni/libcni"
|
"github.com/containernetworking/cni/libcni"
|
||||||
"github.com/containernetworking/cni/pkg/skel"
|
"github.com/containernetworking/cni/pkg/skel"
|
||||||
@ -609,3 +612,13 @@ func CheckSystemNamespaces(namespace string, systemNamespaces []string) bool {
|
|||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetReadinessIndicatorFile waits for readinessIndicatorFile
|
||||||
|
func GetReadinessIndicatorFile(readinessIndicatorFile string) error {
|
||||||
|
pollDuration := 1000 * time.Millisecond
|
||||||
|
pollTimeout := 45 * time.Second
|
||||||
|
return utilwait.PollImmediate(pollDuration, pollTimeout, func() (bool, error) {
|
||||||
|
_, err := os.Stat(readinessIndicatorFile)
|
||||||
|
return err == nil, nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user