Add net-attach-def informer for thick plugin

This change introduces net-attach-def informer in multus-daemon,
thick pluign case. It could reduced API calls to get
net-attach-def.
This commit is contained in:
Tomofumi Hayashi
2024-01-20 02:04:21 +09:00
parent 3477c9c827
commit 6ac6fe675f
24 changed files with 796 additions and 121 deletions

View File

@@ -40,6 +40,11 @@ import (
"gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/server/config"
"gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/types"
netdefv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1"
netdefclient "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/clientset/versioned"
netdefinformer "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/informers/externalversions"
netdefinformerv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/informers/externalversions/k8s.cni.cncf.io/v1"
kapi "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -161,6 +166,21 @@ func informerObjectTrim(obj interface{}) (interface{}, error) {
return obj, nil
}
func newNetDefInformer(netdefClient netdefclient.Interface) (netdefinformer.SharedInformerFactory, cache.SharedIndexInformer) {
const resyncInterval time.Duration = 1 * time.Second
informerFactory := netdefinformer.NewSharedInformerFactoryWithOptions(netdefClient, resyncInterval)
netdefInformer := informerFactory.InformerFor(&netdefv1.NetworkAttachmentDefinition{}, func(client netdefclient.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return netdefinformerv1.NewNetworkAttachmentDefinitionInformer(
client,
kapi.NamespaceAll,
resyncPeriod,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
})
return informerFactory, netdefInformer
}
func newPodInformer(kubeClient kubernetes.Interface, nodeName string) (internalinterfaces.SharedInformerFactory, cache.SharedIndexInformer) {
var tweakFunc internalinterfaces.TweakListOptionsFunc
if nodeName != "" {
@@ -171,9 +191,7 @@ func newPodInformer(kubeClient kubernetes.Interface, nodeName string) (internali
}
}
// Multus only watches pods so there's no possibility of race conditions
// between multiple resources that might require a resync to resolve
const resyncInterval time.Duration = 0 * time.Second
const resyncInterval time.Duration = 1 * time.Second
informerFactory := informerfactory.NewSharedInformerFactoryWithOptions(kubeClient, resyncInterval, informerfactory.WithTransform(informerObjectTrim))
podInformer := informerFactory.InformerFor(&kapi.Pod{}, func(c kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
@@ -246,6 +264,8 @@ func NewCNIServer(daemonConfig *ControllerNetConf, serverConfig []byte, ignoreRe
func newCNIServer(rundir string, kubeClient *k8s.ClientInfo, exec invoke.Exec, servConfig []byte, ignoreReadinessIndicator bool) (*Server, error) {
informerFactory, podInformer := newPodInformer(kubeClient.Client, os.Getenv("MULTUS_NODE_NAME"))
netdefInformerFactory, netdefInformer := newNetDefInformer(kubeClient.NetClient)
kubeClient.SetK8sClientInformers(podInformer, netdefInformer)
router := http.NewServeMux()
s := &Server{
@@ -267,6 +287,8 @@ func newCNIServer(rundir string, kubeClient *k8s.ClientInfo, exec invoke.Exec, s
},
informerFactory: informerFactory,
podInformer: podInformer,
netdefInformerFactory: netdefInformerFactory,
netdefInformer: netdefInformer,
ignoreReadinessIndicator: ignoreReadinessIndicator,
}
s.SetKeepAlivesEnabled(false)
@@ -343,6 +365,7 @@ func newCNIServer(rundir string, kubeClient *k8s.ClientInfo, exec invoke.Exec, s
// Start starts the server and begins serving on the given listener
func (s *Server) Start(ctx context.Context, l net.Listener) {
s.informerFactory.Start(ctx.Done())
s.netdefInformerFactory.Start(ctx.Done())
// Give the initial sync some time to complete in large clusters, but
// don't wait forever
@@ -352,6 +375,14 @@ func (s *Server) Start(ctx context.Context, l net.Listener) {
}
waitCancel()
// Give the initial sync some time to complete in large clusters, but
// don't wait forever
waitCtx, waitCancel = context.WithTimeout(ctx, 20*time.Second)
if !cache.WaitForCacheSync(waitCtx.Done(), s.netdefInformer.HasSynced) {
logging.Errorf("failed to sync net-attach-def informer cache")
}
waitCancel()
go func() {
utilwait.UntilWithContext(ctx, func(ctx context.Context) {
logging.Debugf("open for business")
@@ -562,7 +593,7 @@ func (s *Server) cmdAdd(cmdArgs *skel.CmdArgs, k8sArgs *types.K8sArgs) ([]byte,
}
logging.Debugf("CmdAdd for [%s/%s]. CNI conf: %+v", namespace, podName, *cmdArgs)
result, err := multus.CmdAdd(cmdArgs, s.exec, s.kubeclient, s.podInformer)
result, err := multus.CmdAdd(cmdArgs, s.exec, s.kubeclient)
if err != nil {
return nil, fmt.Errorf("error configuring pod [%s/%s] networking: %v", namespace, podName, err)
}
@@ -577,7 +608,7 @@ func (s *Server) cmdDel(cmdArgs *skel.CmdArgs, k8sArgs *types.K8sArgs) error {
}
logging.Debugf("CmdDel for [%s/%s]. CNI conf: %+v", namespace, podName, *cmdArgs)
return multus.CmdDel(cmdArgs, s.exec, s.kubeclient, s.podInformer)
return multus.CmdDel(cmdArgs, s.exec, s.kubeclient)
}
func (s *Server) cmdCheck(cmdArgs *skel.CmdArgs, k8sArgs *types.K8sArgs) error {
@@ -611,7 +642,7 @@ func (s *Server) cmdDelegateAdd(cmdArgs *skel.CmdArgs, k8sArgs *types.K8sArgs, m
if namespace == "" || podName == "" {
return nil, fmt.Errorf("required CNI variable missing. pod name: %s; pod namespace: %s", podName, namespace)
}
pod, err := multus.GetPod(s.kubeclient, s.podInformer, k8sArgs, false)
pod, err := multus.GetPod(s.kubeclient, k8sArgs, false)
if err != nil {
return nil, err
}
@@ -665,7 +696,7 @@ func (s *Server) cmdDelegateDel(cmdArgs *skel.CmdArgs, k8sArgs *types.K8sArgs, m
if namespace == "" || podName == "" {
return fmt.Errorf("required CNI variable missing. pod name: %s; pod namespace: %s", podName, namespace)
}
pod, err := multus.GetPod(s.kubeclient, s.podInformer, k8sArgs, false)
pod, err := multus.GetPod(s.kubeclient, k8sArgs, false)
if err != nil {
return err
}

View File

@@ -210,7 +210,7 @@ func fakeK8sClient() *k8s.ClientInfo {
const magicNumber = 10
return &k8s.ClientInfo{
Client: fake.NewSimpleClientset(),
NetClient: netfake.NewSimpleClientset().K8sCniCncfIoV1(),
NetClient: netfake.NewSimpleClientset(),
EventRecorder: record.NewFakeRecorder(magicNumber),
}
}

View File

@@ -24,6 +24,7 @@ import (
"gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/k8sclient"
netdefinformer "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/informers/externalversions"
"k8s.io/client-go/informers/internalinterfaces"
"k8s.io/client-go/tools/cache"
)
@@ -48,13 +49,15 @@ type Metrics struct {
// the CNI shim requests issued when a pod is added / removed.
type Server struct {
http.Server
rundir string
kubeclient *k8sclient.ClientInfo
exec invoke.Exec
serverConfig []byte
metrics *Metrics
informerFactory internalinterfaces.SharedInformerFactory
podInformer cache.SharedIndexInformer
rundir string
kubeclient *k8sclient.ClientInfo
exec invoke.Exec
serverConfig []byte
metrics *Metrics
informerFactory internalinterfaces.SharedInformerFactory
podInformer cache.SharedIndexInformer
netdefInformerFactory netdefinformer.SharedInformerFactory
netdefInformer cache.SharedIndexInformer
ignoreReadinessIndicator bool
}