mirror of
https://github.com/rancher/steve.git
synced 2025-04-27 11:00:48 +00:00
This uses SQLite-backed informers provided by Lasso with https://github.com/rancher/lasso/pull/65 to implement Steve API (/v1/) functionality. This new functionality is available behind a feature flag to be specified at Steve startup See https://confluence.suse.com/pages/viewpage.action?pageId=1359086083 Co-authored-by: Ricardo Weir <ricardo.weir@suse.com> Co-authored-by: Michael Bolot <michael.bolot@suse.com> Co-authored-by: Silvio Moioli <silvio@moioli.net> Signed-off-by: Silvio Moioli <silvio@moioli.net>
262 lines
7.1 KiB
Go
262 lines
7.1 KiB
Go
package server
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"net/http"
|
|
|
|
apiserver "github.com/rancher/apiserver/pkg/server"
|
|
"github.com/rancher/apiserver/pkg/types"
|
|
"github.com/rancher/dynamiclistener/server"
|
|
"github.com/rancher/steve/pkg/accesscontrol"
|
|
"github.com/rancher/steve/pkg/aggregation"
|
|
"github.com/rancher/steve/pkg/auth"
|
|
"github.com/rancher/steve/pkg/client"
|
|
"github.com/rancher/steve/pkg/clustercache"
|
|
schemacontroller "github.com/rancher/steve/pkg/controllers/schema"
|
|
"github.com/rancher/steve/pkg/resources"
|
|
"github.com/rancher/steve/pkg/resources/common"
|
|
"github.com/rancher/steve/pkg/resources/schemas"
|
|
"github.com/rancher/steve/pkg/schema"
|
|
"github.com/rancher/steve/pkg/schema/definitions"
|
|
"github.com/rancher/steve/pkg/server/handler"
|
|
"github.com/rancher/steve/pkg/server/router"
|
|
metricsStore "github.com/rancher/steve/pkg/stores/metrics"
|
|
"github.com/rancher/steve/pkg/stores/proxy"
|
|
"github.com/rancher/steve/pkg/stores/sqlpartition"
|
|
"github.com/rancher/steve/pkg/stores/sqlproxy"
|
|
"github.com/rancher/steve/pkg/summarycache"
|
|
"k8s.io/client-go/rest"
|
|
)
|
|
|
|
var ErrConfigRequired = errors.New("rest config is required")
|
|
|
|
type Server struct {
|
|
http.Handler
|
|
|
|
ClientFactory *client.Factory
|
|
ClusterCache clustercache.ClusterCache
|
|
SchemaFactory schema.Factory
|
|
RESTConfig *rest.Config
|
|
BaseSchemas *types.APISchemas
|
|
AccessSetLookup accesscontrol.AccessSetLookup
|
|
APIServer *apiserver.Server
|
|
ClusterRegistry string
|
|
Version string
|
|
|
|
authMiddleware auth.Middleware
|
|
controllers *Controllers
|
|
needControllerStart bool
|
|
next http.Handler
|
|
router router.RouterFunc
|
|
|
|
aggregationSecretNamespace string
|
|
aggregationSecretName string
|
|
SQLCache bool
|
|
}
|
|
|
|
type Options struct {
|
|
// Controllers If the controllers are passed in the caller must also start the controllers
|
|
Controllers *Controllers
|
|
ClientFactory *client.Factory
|
|
AccessSetLookup accesscontrol.AccessSetLookup
|
|
AuthMiddleware auth.Middleware
|
|
Next http.Handler
|
|
Router router.RouterFunc
|
|
AggregationSecretNamespace string
|
|
AggregationSecretName string
|
|
ClusterRegistry string
|
|
ServerVersion string
|
|
// SQLCache enables the SQLite-based lasso caching mechanism
|
|
SQLCache bool
|
|
}
|
|
|
|
func New(ctx context.Context, restConfig *rest.Config, opts *Options) (*Server, error) {
|
|
if opts == nil {
|
|
opts = &Options{}
|
|
}
|
|
|
|
server := &Server{
|
|
RESTConfig: restConfig,
|
|
ClientFactory: opts.ClientFactory,
|
|
AccessSetLookup: opts.AccessSetLookup,
|
|
authMiddleware: opts.AuthMiddleware,
|
|
controllers: opts.Controllers,
|
|
next: opts.Next,
|
|
router: opts.Router,
|
|
aggregationSecretNamespace: opts.AggregationSecretNamespace,
|
|
aggregationSecretName: opts.AggregationSecretName,
|
|
ClusterRegistry: opts.ClusterRegistry,
|
|
Version: opts.ServerVersion,
|
|
// SQLCache enables the SQLite-based lasso caching mechanism
|
|
SQLCache: opts.SQLCache,
|
|
}
|
|
|
|
if err := setup(ctx, server); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return server, server.start(ctx)
|
|
}
|
|
|
|
func setDefaults(server *Server) error {
|
|
if server.RESTConfig == nil {
|
|
return ErrConfigRequired
|
|
}
|
|
|
|
if server.controllers == nil {
|
|
var err error
|
|
server.controllers, err = NewController(server.RESTConfig, nil)
|
|
server.needControllerStart = true
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if server.next == nil {
|
|
server.next = http.NotFoundHandler()
|
|
}
|
|
|
|
if server.BaseSchemas == nil {
|
|
server.BaseSchemas = types.EmptyAPISchemas()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func setup(ctx context.Context, server *Server) error {
|
|
err := setDefaults(server)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
cf := server.ClientFactory
|
|
if cf == nil {
|
|
cf, err = client.NewFactory(server.RESTConfig, server.authMiddleware != nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
server.ClientFactory = cf
|
|
}
|
|
|
|
asl := server.AccessSetLookup
|
|
if asl == nil {
|
|
asl = accesscontrol.NewAccessStore(ctx, true, server.controllers.RBAC)
|
|
}
|
|
|
|
ccache := clustercache.NewClusterCache(ctx, cf.AdminDynamicClient())
|
|
server.ClusterCache = ccache
|
|
sf := schema.NewCollection(ctx, server.BaseSchemas, asl)
|
|
|
|
if err = resources.DefaultSchemas(ctx, server.BaseSchemas, ccache, server.ClientFactory, sf, server.Version); err != nil {
|
|
return err
|
|
}
|
|
definitions.Register(ctx, server.BaseSchemas, server.controllers.K8s.Discovery(),
|
|
server.controllers.CRD.CustomResourceDefinition(), server.controllers.API.APIService())
|
|
|
|
summaryCache := summarycache.New(sf, ccache)
|
|
summaryCache.Start(ctx)
|
|
cols, err := common.NewDynamicColumns(server.RESTConfig)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var onSchemasHandler schemacontroller.SchemasHandlerFunc
|
|
if server.SQLCache {
|
|
s, err := sqlproxy.NewProxyStore(cols, cf, summaryCache, nil)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
errStore := proxy.NewErrorStore(
|
|
proxy.NewUnformatterStore(
|
|
proxy.NewWatchRefresh(
|
|
sqlpartition.NewStore(
|
|
s,
|
|
asl,
|
|
),
|
|
asl,
|
|
),
|
|
),
|
|
)
|
|
store := metricsStore.NewMetricsStore(errStore)
|
|
// end store setup code
|
|
|
|
for _, template := range resources.DefaultSchemaTemplatesForStore(store, server.BaseSchemas, summaryCache, server.controllers.K8s.Discovery()) {
|
|
sf.AddTemplate(template)
|
|
}
|
|
|
|
onSchemasHandler = func(schemas *schema.Collection) error {
|
|
if err := ccache.OnSchemas(schemas); err != nil {
|
|
return err
|
|
}
|
|
if err := s.Reset(); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
} else {
|
|
for _, template := range resources.DefaultSchemaTemplates(cf, server.BaseSchemas, summaryCache, asl, server.controllers.K8s.Discovery(), server.controllers.Core.Namespace().Cache()) {
|
|
sf.AddTemplate(template)
|
|
}
|
|
onSchemasHandler = ccache.OnSchemas
|
|
}
|
|
|
|
schemas.SetupWatcher(ctx, server.BaseSchemas, asl, sf)
|
|
|
|
schemacontroller.Register(ctx,
|
|
cols,
|
|
server.controllers.K8s.Discovery(),
|
|
server.controllers.CRD.CustomResourceDefinition(),
|
|
server.controllers.API.APIService(),
|
|
server.controllers.K8s.AuthorizationV1().SelfSubjectAccessReviews(),
|
|
onSchemasHandler,
|
|
sf)
|
|
|
|
apiServer, handler, err := handler.New(server.RESTConfig, sf, server.authMiddleware, server.next, server.router)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
server.APIServer = apiServer
|
|
server.Handler = handler
|
|
server.SchemaFactory = sf
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Server) start(ctx context.Context) error {
|
|
if c.needControllerStart {
|
|
if err := c.controllers.Start(ctx); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *Server) StartAggregation(ctx context.Context) {
|
|
aggregation.Watch(ctx, c.controllers.Core.Secret(), c.aggregationSecretNamespace,
|
|
c.aggregationSecretName, c)
|
|
}
|
|
|
|
func (c *Server) ListenAndServe(ctx context.Context, httpsPort, httpPort int, opts *server.ListenOpts) error {
|
|
if opts == nil {
|
|
opts = &server.ListenOpts{}
|
|
}
|
|
if opts.Storage == nil && opts.Secrets == nil {
|
|
opts.Secrets = c.controllers.Core.Secret()
|
|
}
|
|
|
|
c.StartAggregation(ctx)
|
|
|
|
if len(opts.TLSListenerConfig.SANs) == 0 {
|
|
opts.TLSListenerConfig.SANs = []string{"127.0.0.1"}
|
|
}
|
|
if err := server.ListenAndServe(ctx, httpsPort, httpPort, c, opts); err != nil {
|
|
return err
|
|
}
|
|
|
|
<-ctx.Done()
|
|
return ctx.Err()
|
|
}
|