server: simplify server start

Move server start code to a common function that both regular
and test code can use. Also shut down the server from the
testcases.

Signed-off-by: Dan Williams <dcbw@redhat.com>
This commit is contained in:
Dan Williams 2023-09-08 13:42:27 -05:00
parent 1605ffcad5
commit cec1a53cd8
3 changed files with 35 additions and 21 deletions

View File

@ -30,7 +30,6 @@ import (
"syscall" "syscall"
"time" "time"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
utilwait "k8s.io/apimachinery/pkg/util/wait" utilwait "k8s.io/apimachinery/pkg/util/wait"
"gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/logging" "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/logging"
@ -207,15 +206,8 @@ func startMultusDaemon(ctx context.Context, daemonConfig *srv.ControllerNetConf)
return fmt.Errorf("failed to start the CNI server using socket %s. Reason: %+v", api.SocketPath(daemonConfig.SocketDir), err) return fmt.Errorf("failed to start the CNI server using socket %s. Reason: %+v", api.SocketPath(daemonConfig.SocketDir), err)
} }
server.SetKeepAlivesEnabled(false) server.Start(ctx, l)
go func() {
utilwait.UntilWithContext(ctx, func(ctx context.Context) {
logging.Debugf("open for business")
if err := server.Serve(l); err != nil {
utilruntime.HandleError(fmt.Errorf("CNI server Serve() failed: %v", err))
}
}, 0)
}()
go func() { go func() {
<-ctx.Done() <-ctx.Done()
server.Shutdown(context.Background()) server.Shutdown(context.Background())

View File

@ -16,6 +16,7 @@ package server
import ( import (
"bytes" "bytes"
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
@ -38,6 +39,9 @@ import (
"gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/server/api" "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/server/api"
"gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/server/config" "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/server/config"
"gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/types" "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
utilwait "k8s.io/apimachinery/pkg/util/wait"
) )
const ( const (
@ -180,6 +184,8 @@ func newCNIServer(rundir string, kubeClient *k8s.ClientInfo, exec invoke.Exec, s
), ),
}, },
} }
s.SetKeepAlivesEnabled(false)
// register metrics // register metrics
prometheus.MustRegister(s.metrics.requestCounter) prometheus.MustRegister(s.metrics.requestCounter)
@ -249,6 +255,18 @@ func newCNIServer(rundir string, kubeClient *k8s.ClientInfo, exec invoke.Exec, s
return s, nil return s, nil
} }
// Start starts the server and begins serving on the given listener
func (s *Server) Start(ctx context.Context, l net.Listener) {
go func() {
utilwait.UntilWithContext(ctx, func(ctx context.Context) {
logging.Debugf("open for business")
if err := s.Serve(l); err != nil {
utilruntime.HandleError(fmt.Errorf("CNI server Serve() failed: %v", err))
}
}, 0)
}()
}
func (s *Server) handleCNIRequest(r *http.Request) ([]byte, error) { func (s *Server) handleCNIRequest(r *http.Request) ([]byte, error) {
var cr api.Request var cr api.Request
b, err := io.ReadAll(r.Body) b, err := io.ReadAll(r.Body)

View File

@ -30,8 +30,6 @@ import (
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
utilwait "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
@ -102,6 +100,8 @@ var _ = Describe(suiteName, func() {
cniServer *Server cniServer *Server
K8sClient *k8s.ClientInfo K8sClient *k8s.ClientInfo
netns ns.NetNS netns ns.NetNS
ctx context.Context
cancel context.CancelFunc
) )
BeforeEach(func() { BeforeEach(func() {
@ -109,7 +109,9 @@ var _ = Describe(suiteName, func() {
K8sClient = fakeK8sClient() K8sClient = fakeK8sClient()
Expect(FilesystemPreRequirements(thickPluginRunDir)).To(Succeed()) Expect(FilesystemPreRequirements(thickPluginRunDir)).To(Succeed())
cniServer, err = startCNIServer(thickPluginRunDir, K8sClient, nil)
ctx, cancel = context.WithCancel(context.TODO())
cniServer, err = startCNIServer(ctx, thickPluginRunDir, K8sClient, nil)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
netns, err = testutils.NewNS() netns, err = testutils.NewNS()
@ -121,6 +123,7 @@ var _ = Describe(suiteName, func() {
}) })
AfterEach(func() { AfterEach(func() {
cancel()
unregisterMetrics(cniServer) unregisterMetrics(cniServer)
Expect(cniServer.Close()).To(Succeed()) Expect(cniServer.Close()).To(Succeed())
Expect(teardownCNIEnv()).To(Succeed()) Expect(teardownCNIEnv()).To(Succeed())
@ -151,6 +154,8 @@ var _ = Describe(suiteName, func() {
cniServer *Server cniServer *Server
K8sClient *k8s.ClientInfo K8sClient *k8s.ClientInfo
netns ns.NetNS netns ns.NetNS
ctx context.Context
cancel context.CancelFunc
) )
BeforeEach(func() { BeforeEach(func() {
@ -163,7 +168,9 @@ var _ = Describe(suiteName, func() {
}` }`
Expect(FilesystemPreRequirements(thickPluginRunDir)).To(Succeed()) Expect(FilesystemPreRequirements(thickPluginRunDir)).To(Succeed())
cniServer, err = startCNIServer(thickPluginRunDir, K8sClient, []byte(dummyServerConfig))
ctx, cancel = context.WithCancel(context.TODO())
cniServer, err = startCNIServer(ctx, thickPluginRunDir, K8sClient, []byte(dummyServerConfig))
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
netns, err = testutils.NewNS() netns, err = testutils.NewNS()
@ -175,6 +182,7 @@ var _ = Describe(suiteName, func() {
}) })
AfterEach(func() { AfterEach(func() {
cancel()
unregisterMetrics(cniServer) unregisterMetrics(cniServer)
Expect(cniServer.Close()).To(Succeed()) Expect(cniServer.Close()).To(Succeed())
Expect(teardownCNIEnv()).To(Succeed()) Expect(teardownCNIEnv()).To(Succeed())
@ -245,7 +253,7 @@ func createFakePod(k8sClient *k8s.ClientInfo, podName string) error {
return err return err
} }
func startCNIServer(runDir string, k8sClient *k8s.ClientInfo, servConfig []byte) (*Server, error) { func startCNIServer(ctx context.Context, runDir string, k8sClient *k8s.ClientInfo, servConfig []byte) (*Server, error) {
const period = 0 const period = 0
cniServer, err := newCNIServer(runDir, k8sClient, &fakeExec{}, servConfig) cniServer, err := newCNIServer(runDir, k8sClient, &fakeExec{}, servConfig)
@ -258,12 +266,8 @@ func startCNIServer(runDir string, k8sClient *k8s.ClientInfo, servConfig []byte)
return nil, fmt.Errorf("failed to start the CNI server using socket %s. Reason: %+v", api.SocketPath(runDir), err) return nil, fmt.Errorf("failed to start the CNI server using socket %s. Reason: %+v", api.SocketPath(runDir), err)
} }
cniServer.SetKeepAlivesEnabled(false) cniServer.Start(ctx, l)
go utilwait.Forever(func() {
if err := cniServer.Serve(l); err != nil {
utilruntime.HandleError(fmt.Errorf("CNI server Serve() failed: %v", err))
}
}, period)
return cniServer, nil return cniServer, nil
} }