1
0
mirror of https://github.com/rancher/steve.git synced 2025-04-28 03:10:32 +00:00

Add support for listing helm releases

This commit is contained in:
Darren Shepherd 2020-06-05 13:30:33 -07:00
parent de111ebb62
commit b60484bf2e
14 changed files with 1093 additions and 154 deletions

View File

@ -0,0 +1,58 @@
package helm
import (
"github.com/rancher/steve/pkg/attributes"
"github.com/rancher/steve/pkg/schema/converter"
"github.com/rancher/steve/pkg/schemaserver/types"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)
func DropHelmData(request *types.APIRequest, resource *types.RawResource) {
data := resource.APIObject.Data()
if data.String("metadata", "labels", "owner") == "helm" ||
data.String("metadata", "labels", "OWNER") == "TILLER" {
if data.String("data", "release") != "" {
delete(data.Map("data"), "release")
}
}
}
func FormatRelease(request *types.APIRequest, resource *types.RawResource) {
obj, ok := resource.APIObject.Object.(runtime.Object)
if !ok {
return
}
release, err := ToRelease(obj, SchemeBasedNamespaceLookup(request.Schemas))
if err == ErrNotHelmRelease {
return
} else if err != nil {
logrus.Errorf("failed to render helm release: %v", err)
return
}
var (
data = resource.APIObject.Data()
namespace = data.String("metadata", "namespace")
name = data.String("metadata", "name")
)
switch data.String("kind") {
case "Secret":
resource.ID = namespace + "/s:" + name
case "ConfigMap":
resource.ID = namespace + "/c:" + name
}
resource.Links["self"] = request.URLBuilder.ResourceLink(request.Schema, resource.ID)
resource.APIObject.Object = release
}
func SchemeBasedNamespaceLookup(schemas *types.APISchemas) IsNamespaced {
return func(gvk schema.GroupVersionKind) bool {
schema := schemas.LookupSchema(converter.GVKToSchemaID(gvk))
return schema != nil && attributes.Namespaced(schema)
}
}

View File

@ -0,0 +1,158 @@
package helm
import (
"bytes"
"compress/gzip"
"encoding/base64"
"io/ioutil"
"strings"
"time"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/sirupsen/logrus"
rspb "k8s.io/helm/pkg/proto/hapi/release"
"sigs.k8s.io/yaml"
)
var (
readmes = map[string]bool{
"readme": true,
"readme.txt": true,
"readme.md": true,
}
statusMapping = map[string]Status{
"UNKNOWN": StatusUnknown,
"DEPLOYED": StatusDeployed,
"DELETED": StatusUninstalled,
"SUPERSEDED": StatusSuperseded,
"FAILED": StatusFailed,
"DELETING": StatusUninstalling,
"PENDING_INSTALL": StatusPendingInstall,
"PENDING_UPGRADE": StatusPendingUpgrade,
"PENDING_ROLLBACK": StatusPendingRollback,
}
)
func isHelm2(labels map[string]string) bool {
return labels["OWNER"] == "TILLER"
}
func fromHelm2Data(data string, isNamespaced IsNamespaced) (*Release, error) {
release, err := decodeHelm2(data)
if err != nil {
return nil, err
}
return fromHelm2ReleaseToRelease(release, isNamespaced)
}
func toTime(t *timestamp.Timestamp) time.Time {
return time.Unix(t.GetSeconds(), int64(t.GetNanos())).UTC()
}
func fromHelm2ReleaseToRelease(release *rspb.Release, isNamespaced IsNamespaced) (*Release, error) {
var (
err error
)
hr := &Release{
Name: release.Name,
Info: &Info{
FirstDeployed: toTime(release.GetInfo().GetFirstDeployed()),
LastDeployed: toTime(release.GetInfo().GetLastDeployed()),
Deleted: toTime(release.GetInfo().GetDeleted()),
Description: release.GetInfo().GetDescription(),
Status: statusMapping[release.GetInfo().GetStatus().GetCode().String()],
Notes: release.GetInfo().GetStatus().GetNotes(),
},
Chart: &Chart{
Values: toMap(release.Namespace, release.Name, release.GetChart().GetValues().GetRaw()),
Metadata: &Metadata{
Name: release.GetChart().GetMetadata().GetName(),
Home: release.GetChart().GetMetadata().GetHome(),
Sources: release.GetChart().GetMetadata().GetSources(),
Version: release.GetChart().GetMetadata().GetVersion(),
Description: release.GetChart().GetMetadata().GetDescription(),
Keywords: release.GetChart().GetMetadata().GetKeywords(),
Icon: release.GetChart().GetMetadata().GetIcon(),
Condition: release.GetChart().GetMetadata().GetCondition(),
Tags: release.GetChart().GetMetadata().GetTags(),
AppVersion: release.GetChart().GetMetadata().GetAppVersion(),
Deprecated: release.GetChart().GetMetadata().GetDeprecated(),
Annotations: release.GetChart().GetMetadata().GetAnnotations(),
KubeVersion: release.GetChart().GetMetadata().GetKubeVersion(),
},
},
Values: toMap(release.Namespace, release.Name, release.GetConfig().GetRaw()),
Version: int(release.Version),
Namespace: release.Namespace,
HelmMajorVersion: 3,
}
for _, m := range release.GetChart().GetMetadata().GetMaintainers() {
if m == nil {
continue
}
hr.Chart.Metadata.Maintainers = append(hr.Chart.Metadata.Maintainers, Maintainer{
Name: m.GetName(),
Email: m.GetEmail(),
URL: m.GetUrl(),
})
}
for _, f := range release.GetChart().GetFiles() {
if f == nil {
continue
}
if readmes[strings.ToLower(f.TypeUrl)] {
hr.Info.Readme = string(f.Value)
}
}
hr.Resources, err = resourcesFromManifest(release.Namespace, release.Manifest, isNamespaced)
return hr, err
}
func toMap(namespace, name string, manifest string) map[string]interface{} {
values := map[string]interface{}{}
if manifest == "" {
return values
}
if err := yaml.Unmarshal([]byte(manifest), &values); err != nil {
logrus.Errorf("failed to unmarshal yaml for %s/%s", namespace, name)
}
return values
}
func decodeHelm2(data string) (*rspb.Release, error) {
b, err := base64.StdEncoding.DecodeString(data)
if err != nil {
return nil, err
}
// For backwards compatibility with releases that were stored before
// compression was introduced we skip decompression if the
// gzip magic header is not found
if bytes.Equal(b[0:3], magicGzip) {
r, err := gzip.NewReader(bytes.NewReader(b))
if err != nil {
return nil, err
}
b2, err := ioutil.ReadAll(r)
if err != nil {
return nil, err
}
b = b2
}
var rls rspb.Release
// unmarshal protobuf bytes
if err := proto.Unmarshal(b, &rls); err != nil {
return nil, err
}
return &rls, nil
}

View File

@ -0,0 +1,132 @@
package helm
import (
"bytes"
"compress/gzip"
"encoding/base64"
"encoding/json"
"io/ioutil"
"strings"
"helm.sh/helm/v3/pkg/release"
)
func isHelm3(labels map[string]string) bool {
return labels["owner"] == "helm"
}
func fromHelm3Data(data string, isNamespaced IsNamespaced) (*Release, error) {
release, err := decodeHelm3(data)
if err != nil {
return nil, err
}
return fromHelm3ReleaseToRelease(release, isNamespaced)
}
func fromHelm3ReleaseToRelease(release *release.Release, isNamespaced IsNamespaced) (*Release, error) {
var (
info = &Info{}
chart = &Chart{}
err error
)
if release.Info != nil {
info = &Info{
FirstDeployed: release.Info.FirstDeployed.Time,
LastDeployed: release.Info.LastDeployed.Time,
Deleted: release.Info.Deleted.Time,
Description: release.Info.Description,
Status: Status(release.Info.Status),
Notes: release.Info.Notes,
}
}
if release.Chart != nil {
chart = &Chart{
Values: release.Chart.Values,
}
if release.Chart.Metadata != nil {
chart.Metadata = &Metadata{
Name: release.Chart.Metadata.Name,
Home: release.Chart.Metadata.Home,
Sources: release.Chart.Metadata.Sources,
Version: release.Chart.Metadata.Version,
Description: release.Chart.Metadata.Description,
Keywords: release.Chart.Metadata.Keywords,
Icon: release.Chart.Metadata.Icon,
APIVersion: release.Chart.Metadata.APIVersion,
Condition: release.Chart.Metadata.Condition,
Tags: release.Chart.Metadata.Tags,
AppVersion: release.Chart.Metadata.AppVersion,
Deprecated: release.Chart.Metadata.Deprecated,
Annotations: release.Chart.Metadata.Annotations,
KubeVersion: release.Chart.Metadata.KubeVersion,
Type: release.Chart.Metadata.Type,
}
for _, m := range release.Chart.Metadata.Maintainers {
if m == nil {
continue
}
chart.Metadata.Maintainers = append(chart.Metadata.Maintainers, Maintainer{
Name: m.Name,
Email: m.Email,
URL: m.URL,
})
}
}
for _, f := range release.Chart.Files {
if f == nil {
continue
}
if readmes[strings.ToLower(f.Name)] {
info.Readme = string(f.Data)
}
}
}
hr := &Release{
Name: release.Name,
Info: info,
Chart: chart,
Values: release.Config,
Resources: nil,
Version: release.Version,
Namespace: release.Namespace,
HelmMajorVersion: 3,
}
hr.Resources, err = resourcesFromManifest(release.Namespace, release.Manifest, isNamespaced)
return hr, err
}
func decodeHelm3(data string) (*release.Release, error) {
b, err := base64.StdEncoding.DecodeString(data)
if err != nil {
return nil, err
}
// For backwards compatibility with releases that were stored before
// compression was introduced we skip decompression if the
// gzip magic header is not found
if bytes.Equal(b[0:3], magicGzip) {
r, err := gzip.NewReader(bytes.NewReader(b))
if err != nil {
return nil, err
}
b2, err := ioutil.ReadAll(r)
if err != nil {
return nil, err
}
b = b2
}
var rls release.Release
// unmarshal release object bytes
if err := json.Unmarshal(b, &rls); err != nil {
return nil, err
}
return &rls, nil
}

View File

@ -0,0 +1,20 @@
package helm
import (
"net/http"
"github.com/rancher/steve/pkg/schemaserver/types"
"github.com/rancher/steve/pkg/server/store/partition"
)
func Register(schemas *types.APISchemas) {
schemas.InternalSchemas.TypeName("helmrelease", Release{})
schemas.MustImportAndCustomize(Release{}, func(schema *types.APISchema) {
schema.CollectionMethods = []string{http.MethodGet}
schema.ResourceMethods = []string{http.MethodGet}
schema.Store = &partition.Store{
Partitioner: &partitioner{},
}
schema.Formatter = FormatRelease
})
}

View File

@ -0,0 +1,93 @@
package helm
import (
"bytes"
"encoding/base64"
"errors"
"github.com/rancher/wrangler/pkg/data"
"github.com/rancher/wrangler/pkg/yaml"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
meta2 "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)
var (
ErrNotHelmRelease = errors.New("not helm release")
magicGzip = []byte{0x1f, 0x8b, 0x08}
)
type IsNamespaced func(gvk schema.GroupVersionKind) bool
func ToRelease(obj runtime.Object, isNamespaced IsNamespaced) (*Release, error) {
releaseData, err := getReleaseDataAndKind(obj)
if err != nil {
return nil, err
}
meta, err := meta.Accessor(obj)
if err != nil {
return nil, err
}
switch {
case isHelm3(meta.GetLabels()):
return fromHelm3Data(releaseData, isNamespaced)
case isHelm2(meta.GetLabels()):
return fromHelm2Data(releaseData, isNamespaced)
}
return nil, ErrNotHelmRelease
}
func getReleaseDataAndKind(obj runtime.Object) (string, error) {
switch t := obj.(type) {
case *unstructured.Unstructured:
releaseData := data.Object(t.Object).String("data", "release")
switch t.GetKind() {
case "ConfigMap":
return releaseData, nil
case "Secret":
data, err := base64.StdEncoding.DecodeString(releaseData)
if err != nil {
return "", err
}
return string(data), nil
}
case *corev1.ConfigMap:
return t.Data["release"], nil
case *corev1.Secret:
return string(t.Data["release"]), nil
}
return "", ErrNotHelmRelease
}
func resourcesFromManifest(namespace string, manifest string, isNamespaced IsNamespaced) (result []Resource, err error) {
objs, err := yaml.ToObjects(bytes.NewReader([]byte(manifest)))
if err != nil {
return nil, err
}
for _, obj := range objs {
meta, err := meta2.Accessor(obj)
if err != nil {
return nil, err
}
r := Resource{
Name: meta.GetName(),
Namespace: meta.GetNamespace(),
}
gvk := obj.GetObjectKind().GroupVersionKind()
if isNamespaced != nil && isNamespaced(gvk) && r.Namespace == "" {
r.Namespace = namespace
}
r.APIVersion, r.Kind = gvk.ToAPIVersionAndKind()
result = append(result, r)
}
return result, nil
}

View File

@ -0,0 +1,104 @@
package helm
import (
"strings"
"github.com/rancher/steve/pkg/schemaserver/types"
"github.com/rancher/steve/pkg/server/store/partition"
"github.com/rancher/steve/pkg/server/store/selector"
"github.com/rancher/steve/pkg/server/store/switchschema"
"github.com/rancher/wrangler/pkg/schemas/validation"
"k8s.io/apimachinery/pkg/labels"
)
var (
configMap2 = target{
schemaType: "configmap",
version: "2",
selector: labels.SelectorFromSet(labels.Set{
"OWNER": "TILLER",
}),
}
secret2 = target{
schemaType: "secret",
version: "2",
selector: labels.SelectorFromSet(labels.Set{
"OWNER": "TILLER",
}),
}
secret3 = target{
schemaType: "secret",
version: "3",
selector: labels.SelectorFromSet(labels.Set{
"owner": "helm",
}),
}
all = []partition.Partition{
configMap2,
secret2,
secret3,
}
)
type target struct {
schemaType string
version string
selector labels.Selector
}
func (t target) Name() string {
return t.schemaType + t.version
}
type partitioner struct {
}
func (p *partitioner) Lookup(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) (partition.Partition, error) {
if id == "" {
return nil, validation.Unauthorized
}
t := strings.SplitN(id, ":", 2)[0]
if t == "c" {
return configMap2, nil
} else if t == "s" {
return secret2, nil
}
return nil, validation.NotFound
}
func (p *partitioner) All(apiOp *types.APIRequest, schema *types.APISchema, verb string) ([]partition.Partition, error) {
return all, nil
}
func (p *partitioner) Store(apiOp *types.APIRequest, partition partition.Partition) (types.Store, error) {
target := partition.(target)
schema := apiOp.Schemas.LookupSchema(target.schemaType)
return &stripIDPrefix{
Store: &selector.Store{
Selector: target.selector,
Store: &switchschema.Store{
Schema: schema,
},
},
}, nil
}
type stripIDPrefix struct {
types.Store
}
func stripPrefix(s string) string {
return strings.TrimPrefix(strings.TrimPrefix(s, "c:"), "s:")
}
func (s *stripIDPrefix) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) {
return s.Store.Delete(apiOp, schema, stripPrefix(id))
}
func (s *stripIDPrefix) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) {
return s.Store.ByID(apiOp, schema, stripPrefix(id))
}
func (s *stripIDPrefix) Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (types.APIObject, error) {
return s.Store.Update(apiOp, schema, data, stripPrefix(id))
}

View File

@ -0,0 +1,130 @@
package helm
import (
"time"
)
type Release struct {
// Name is the name of the release
Name string `json:"name,omitempty"`
// Info provides information about a release
Info *Info `json:"info,omitempty"`
// Chart is the chart that was released.
Chart *Chart `json:"chart,omitempty"`
// Config is the set of extra Values added to the chart.
// These values override the default values inside of the chart.
Values map[string]interface{} `json:"values,omitempty"`
// Manifest is the string representation of the rendered template.
Resources []Resource `json:"resources,omitempty"`
// Version is an int which represents the version of the release.
Version int `json:"version,omitempty"`
// Namespace is the kubernetes namespace of the release.
Namespace string `json:"namespace,omitempty"`
HelmMajorVersion int `json:"helmVersion,omitempty"`
}
type Resource struct {
APIVersion string `json:"apiVersion"`
Kind string `json:"kind"`
Name string `json:"name"`
Namespace string `json:"namespace"`
}
// Chart is a helm package that contains metadata, a default config, zero or more
// optionally parameterizable templates, and zero or more charts (dependencies).
type Chart struct {
// Metadata is the contents of the Chartfile.
Metadata *Metadata `json:"metadata"`
// Values are default config for this chart.
Values map[string]interface{} `json:"values"`
}
// Metadata for a Chart file. This models the structure of a Chart.yaml file.
type Metadata struct {
// The name of the chart
Name string `json:"name,omitempty"`
// The URL to a relevant project page, git repo, or contact person
Home string `json:"home,omitempty"`
// Source is the URL to the source code of this chart
Sources []string `json:"sources,omitempty"`
// A SemVer 2 conformant version string of the chart
Version string `json:"version,omitempty"`
// A one-sentence description of the chart
Description string `json:"description,omitempty"`
// A list of string keywords
Keywords []string `json:"keywords,omitempty"`
// A list of name and URL/email address combinations for the maintainer(s)
Maintainers []Maintainer `json:"maintainers,omitempty"`
// The URL to an icon file.
Icon string `json:"icon,omitempty"`
// The API Version of this chart.
APIVersion string `json:"apiVersion,omitempty"`
// The condition to check to enable chart
Condition string `json:"condition,omitempty"`
// The tags to check to enable chart
Tags string `json:"tags,omitempty"`
// The version of the application enclosed inside of this chart.
AppVersion string `json:"appVersion,omitempty"`
// Whether or not this chart is deprecated
Deprecated bool `json:"deprecated,omitempty"`
// Annotations are additional mappings uninterpreted by Helm,
// made available for inspection by other applications.
Annotations map[string]string `json:"annotations,omitempty"`
// KubeVersion is a SemVer constraint specifying the version of Kubernetes required.
KubeVersion string `json:"kubeVersion,omitempty"`
// Specifies the chart type: application or library
Type string `json:"type,omitempty"`
}
// Maintainer describes a Chart maintainer.
type Maintainer struct {
// Name is a user name or organization name
Name string `json:"name,omitempty"`
// Email is an optional email address to contact the named maintainer
Email string `json:"email,omitempty"`
// URL is an optional URL to an address for the named maintainer
URL string `json:"url,omitempty"`
}
// Info describes release information.
type Info struct {
// FirstDeployed is when the release was first deployed.
FirstDeployed time.Time `json:"firstDeployed,omitempty"`
// LastDeployed is when the release was last deployed.
LastDeployed time.Time `json:"lastDeployed,omitempty"`
// Deleted tracks when this object was deleted.
Deleted time.Time `json:"deleted"`
// Description is human-friendly "log entry" about this release.
Description string `json:"description,omitempty"`
// Status is the current state of the release
Status Status `json:"status,omitempty" wrangler:"options=unknown|deployed|uninstalled|superseded|failed|uninstalling|pending-install|pending-upgrade|pending-rollback"`
// Contains the rendered templates/NOTES.txt if available
Notes string `json:"notes,omitempty"`
Readme string `json:"readme,omitempty"`
}
type Status string
// Describe the status of a release
// NOTE: Make sure to update cmd/helm/status.go when adding or modifying any of these statuses.
const (
// StatusUnknown indicates that a release is in an uncertain state.
StatusUnknown Status = "unknown"
// StatusDeployed indicates that the release has been pushed to Kubernetes.
StatusDeployed Status = "deployed"
// StatusUninstalled indicates that a release has been uninstalled from Kubernetes.
StatusUninstalled Status = "uninstalled"
// StatusSuperseded indicates that this release object is outdated and a newer one exists.
StatusSuperseded Status = "superseded"
// StatusFailed indicates that the release was not successfully deployed.
StatusFailed Status = "failed"
// StatusUninstalling indicates that a uninstall operation is underway.
StatusUninstalling Status = "uninstalling"
// StatusPendingInstall indicates that an install operation is underway.
StatusPendingInstall Status = "pending-install"
// StatusPendingUpgrade indicates that an upgrade operation is underway.
StatusPendingUpgrade Status = "pending-upgrade"
// StatusPendingRollback indicates that an rollback operation is underway.
StatusPendingRollback Status = "pending-rollback"
)

View File

@ -14,6 +14,7 @@ import (
"github.com/rancher/steve/pkg/server/resources/clusters"
"github.com/rancher/steve/pkg/server/resources/common"
"github.com/rancher/steve/pkg/server/resources/counts"
"github.com/rancher/steve/pkg/server/resources/helm"
"github.com/rancher/steve/pkg/server/resources/userpreferences"
"github.com/rancher/steve/pkg/server/store/proxy"
"k8s.io/client-go/discovery"
@ -25,6 +26,7 @@ func DefaultSchemas(ctx context.Context, baseSchema *types.APISchemas, ccache cl
apiroot.Register(baseSchema, []string{"v1"}, []string{"proxy:/apis"})
userpreferences.Register(baseSchema, cg)
clusters.Register(ctx, baseSchema, cg, ccache)
helm.Register(baseSchema)
return baseSchema
}
@ -32,5 +34,13 @@ func DefaultSchemaTemplates(cf *client.Factory, lookup accesscontrol.AccessSetLo
return []schema.Template{
common.DefaultTemplate(cf, lookup),
apigroups.Template(discovery),
{
ID: "configmap",
Formatter: helm.DropHelmData,
},
{
ID: "secret",
Formatter: helm.DropHelmData,
},
}
}

View File

@ -1,4 +1,4 @@
package proxy
package partition
import (
"context"
@ -10,6 +10,10 @@ import (
"golang.org/x/sync/semaphore"
)
type Partition interface {
Name() string
}
type ParallelPartitionLister struct {
Lister PartitionLister
Concurrency int64
@ -40,12 +44,12 @@ func (p *ParallelPartitionLister) Continue() string {
return base64.StdEncoding.EncodeToString(bytes)
}
func indexOrZero(partitions []Partition, namespace string) int {
if namespace == "" {
func indexOrZero(partitions []Partition, name string) int {
if name == "" {
return 0
}
for i, partition := range partitions {
if partition.Namespace == namespace {
if partition.Name() == name {
return i
}
}
@ -74,11 +78,11 @@ func (p *ParallelPartitionLister) List(ctx context.Context, limit int, resume st
}
type listState struct {
Revision string `json:"r,omitempty"`
PartitionNamespace string `json:"p,omitempty"`
Continue string `json:"c,omitempty"`
Offset int `json:"o,omitempty"`
Limit int `json:"l,omitempty"`
Revision string `json:"r,omitempty"`
PartitionName string `json:"p,omitempty"`
Continue string `json:"c,omitempty"`
Offset int `json:"o,omitempty"`
Limit int `json:"l,omitempty"`
}
func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, limit int, result chan []types.APIObject) {
@ -97,7 +101,7 @@ func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, l
close(result)
}()
for i := indexOrZero(p.Partitions, state.PartitionNamespace); i < len(p.Partitions); i++ {
for i := indexOrZero(p.Partitions, state.PartitionName); i < len(p.Partitions); i++ {
if capacity <= 0 || isDone(ctx) {
break
}
@ -129,7 +133,7 @@ func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, l
for {
cont := ""
if partition.Namespace == state.PartitionNamespace {
if partition.Name() == state.PartitionName {
cont = state.Continue
}
list, err := p.Lister(ctx, partition, cont, state.Revision, limit)
@ -150,7 +154,7 @@ func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, l
p.revision = list.Revision
}
if state.PartitionNamespace == partition.Namespace && state.Offset > 0 && state.Offset < len(list.Objects) {
if state.PartitionName == partition.Name() && state.Offset > 0 && state.Offset < len(list.Objects) {
list.Objects = list.Objects[state.Offset:]
}
@ -158,11 +162,11 @@ func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, l
result <- list.Objects[:capacity]
// save state to redo this list at this offset
p.state = &listState{
Revision: list.Revision,
PartitionNamespace: partition.Namespace,
Continue: cont,
Offset: capacity,
Limit: limit,
Revision: list.Revision,
PartitionName: partition.Name(),
Continue: cont,
Offset: capacity,
Limit: limit,
}
capacity = 0
return nil
@ -174,7 +178,7 @@ func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, l
}
// loop again and get more data
state.Continue = list.Continue
state.PartitionNamespace = partition.Namespace
state.PartitionName = partition.Name()
state.Offset = 0
}
}

View File

@ -0,0 +1,176 @@
package partition
import (
"context"
"net/http"
"strconv"
"github.com/rancher/steve/pkg/schemaserver/types"
"golang.org/x/sync/errgroup"
)
type Partitioner interface {
Lookup(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) (Partition, error)
All(apiOp *types.APIRequest, schema *types.APISchema, verb string) ([]Partition, error)
Store(apiOp *types.APIRequest, partition Partition) (types.Store, error)
}
type Store struct {
Partitioner Partitioner
}
func (s *Store) getStore(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) (types.Store, error) {
p, err := s.Partitioner.Lookup(apiOp, schema, verb, id)
if err != nil {
return nil, err
}
return s.Partitioner.Store(apiOp, p)
}
func (s *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) {
target, err := s.getStore(apiOp, schema, "delete", id)
if err != nil {
return types.APIObject{}, err
}
return target.Delete(apiOp, schema, id)
}
func (s *Store) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) {
target, err := s.getStore(apiOp, schema, "get", id)
if err != nil {
return types.APIObject{}, err
}
return target.ByID(apiOp, schema, id)
}
func (s *Store) listPartition(ctx context.Context, apiOp *types.APIRequest, schema *types.APISchema, partition Partition,
cont string, revision string, limit int) (types.APIObjectList, error) {
store, err := s.Partitioner.Store(apiOp, partition)
if err != nil {
return types.APIObjectList{}, err
}
req := apiOp.Clone()
req.Request = req.Request.Clone(ctx)
values := req.Request.URL.Query()
values.Set("continue", cont)
values.Set("revision", revision)
if limit > 0 {
values.Set("limit", strconv.Itoa(limit))
} else {
values.Del("limit")
}
req.Request.URL.RawQuery = values.Encode()
return store.List(req, schema)
}
func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) {
var (
result types.APIObjectList
)
paritions, err := s.Partitioner.All(apiOp, schema, "list")
if err != nil {
return result, err
}
lister := ParallelPartitionLister{
Lister: func(ctx context.Context, partition Partition, cont string, revision string, limit int) (types.APIObjectList, error) {
return s.listPartition(ctx, apiOp, schema, partition, cont, revision, limit)
},
Concurrency: 3,
Partitions: paritions,
}
resume := apiOp.Request.URL.Query().Get("continue")
limit := getLimit(apiOp.Request)
list, err := lister.List(apiOp.Context(), limit, resume)
if err != nil {
return result, err
}
for items := range list {
result.Objects = append(result.Objects, items...)
}
result.Revision = lister.Revision()
result.Continue = lister.Continue()
return result, nil
}
func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (types.APIObject, error) {
target, err := s.getStore(apiOp, schema, "create", "")
if err != nil {
return types.APIObject{}, err
}
return target.Create(apiOp, schema, data)
}
func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (types.APIObject, error) {
target, err := s.getStore(apiOp, schema, "update", id)
if err != nil {
return types.APIObject{}, err
}
return target.Update(apiOp, schema, data, id)
}
func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types.WatchRequest) (chan types.APIEvent, error) {
partitions, err := s.Partitioner.All(apiOp, schema, "watch")
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(apiOp.Context())
defer cancel()
apiOp = apiOp.WithContext(ctx)
eg := errgroup.Group{}
response := make(chan types.APIEvent)
for _, partition := range partitions {
store, err := s.Partitioner.Store(apiOp, partition)
if err != nil {
return nil, err
}
eg.Go(func() error {
defer cancel()
c, err := store.Watch(apiOp, schema, wr)
if err != nil {
return err
}
for i := range c {
response <- i
}
return nil
})
}
go func() {
defer close(response)
<-ctx.Done()
eg.Wait()
}()
return response, nil
}
func getLimit(req *http.Request) int {
limitString := req.URL.Query().Get("limit")
limit, err := strconv.Atoi(limitString)
if err != nil {
limit = 0
}
if limit <= 0 {
limit = 100000
}
return limit
}

View File

@ -13,6 +13,7 @@ import (
"github.com/rancher/steve/pkg/accesscontrol"
"github.com/rancher/steve/pkg/attributes"
"github.com/rancher/steve/pkg/schemaserver/types"
"github.com/rancher/steve/pkg/server/store/partition"
"github.com/rancher/wrangler/pkg/data"
"github.com/rancher/wrangler/pkg/schemas/validation"
"github.com/sirupsen/logrus"
@ -50,9 +51,11 @@ type Store struct {
func NewProxyStore(clientGetter ClientGetter, lookup accesscontrol.AccessSetLookup) types.Store {
return &errorStore{
Store: &WatchRefresh{
Store: &RBACStore{
Store: &Store{
clientGetter: clientGetter,
Store: &partition.Store{
Partitioner: &rbacPartitioner{
proxyStore: &Store{
clientGetter: clientGetter,
},
},
},
asl: lookup,

View File

@ -2,17 +2,23 @@ package proxy
import (
"context"
"fmt"
"net/http"
"sort"
"strconv"
"github.com/rancher/steve/pkg/accesscontrol"
"github.com/rancher/steve/pkg/attributes"
"github.com/rancher/steve/pkg/schemaserver/types"
"golang.org/x/sync/errgroup"
"github.com/rancher/steve/pkg/server/store/partition"
"k8s.io/apimachinery/pkg/util/sets"
)
var (
passthroughPartitions = []partition.Partition{
Partition{Passthrough: true},
}
)
type filterKey struct{}
func AddNamespaceConstraint(req *http.Request, names ...string) *http.Request {
@ -26,24 +32,96 @@ func getNamespaceConstraint(req *http.Request) (sets.String, bool) {
return set, ok
}
type RBACStore struct {
*Store
}
type Partition struct {
Namespace string
All bool
Names sets.String
Namespace string
All bool
Passthrough bool
Names sets.String
}
func isPassthrough(apiOp *types.APIRequest, schema *types.APISchema, verb string) ([]Partition, bool) {
func (p Partition) Name() string {
return p.Namespace
}
type rbacPartitioner struct {
proxyStore *Store
}
func (p *rbacPartitioner) Lookup(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) (partition.Partition, error) {
switch verb {
case "get":
fallthrough
case "update":
fallthrough
case "delete":
return passthroughPartitions[0], nil
default:
return nil, fmt.Errorf("invalid verb %s", verb)
}
}
func (p *rbacPartitioner) All(apiOp *types.APIRequest, schema *types.APISchema, verb string) ([]partition.Partition, error) {
switch verb {
case "list":
fallthrough
case "watch":
partitions, passthrough := isPassthrough(apiOp, schema, verb)
if passthrough {
return passthroughPartitions, nil
}
sort.Slice(partitions, func(i, j int) bool {
return partitions[i].(Partition).Namespace < partitions[j].(Partition).Namespace
})
return partitions, nil
default:
return nil, fmt.Errorf("invalid verb %s", verb)
}
}
func (p *rbacPartitioner) Store(apiOp *types.APIRequest, partition partition.Partition) (types.Store, error) {
return &byNameOrNamespaceStore{
Store: p.proxyStore,
partition: partition.(Partition),
}, nil
}
type byNameOrNamespaceStore struct {
*Store
partition Partition
}
func (b *byNameOrNamespaceStore) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) {
if b.partition.Passthrough {
return b.Store.List(apiOp, schema)
}
apiOp.Namespace = b.partition.Namespace
if b.partition.All {
return b.Store.List(apiOp, schema)
}
return b.Store.ByNames(apiOp, schema, b.partition.Names)
}
func (b *byNameOrNamespaceStore) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types.WatchRequest) (chan types.APIEvent, error) {
if b.partition.Passthrough {
return b.Store.Watch(apiOp, schema, wr)
}
apiOp.Namespace = b.partition.Namespace
if b.partition.All {
return b.Store.Watch(apiOp, schema, wr)
}
return b.Store.WatchNames(apiOp, schema, wr, b.partition.Names)
}
func isPassthrough(apiOp *types.APIRequest, schema *types.APISchema, verb string) ([]partition.Partition, bool) {
partitions, passthrough := isPassthroughUnconstrained(apiOp, schema, verb)
namespaces, ok := getNamespaceConstraint(apiOp.Request)
if !ok {
return partitions, passthrough
}
var result []Partition
var result []partition.Partition
if passthrough {
for namespace := range namespaces {
@ -56,7 +134,7 @@ func isPassthrough(apiOp *types.APIRequest, schema *types.APISchema, verb string
}
for _, partition := range partitions {
if namespaces.Has(partition.Namespace) {
if namespaces.Has(partition.Name()) {
result = append(result, partition)
}
}
@ -64,7 +142,7 @@ func isPassthrough(apiOp *types.APIRequest, schema *types.APISchema, verb string
return result, false
}
func isPassthroughUnconstrained(apiOp *types.APIRequest, schema *types.APISchema, verb string) ([]Partition, bool) {
func isPassthroughUnconstrained(apiOp *types.APIRequest, schema *types.APISchema, verb string) ([]partition.Partition, bool) {
accessListByVerb, _ := attributes.Access(schema).(accesscontrol.AccessListByVerb)
if accessListByVerb.All(verb) {
return nil, true
@ -75,8 +153,8 @@ func isPassthroughUnconstrained(apiOp *types.APIRequest, schema *types.APISchema
if resources[apiOp.Namespace].All {
return nil, true
} else {
return []Partition{
{
return []partition.Partition{
Partition{
Namespace: apiOp.Namespace,
Names: resources[apiOp.Namespace].Names,
},
@ -84,7 +162,7 @@ func isPassthroughUnconstrained(apiOp *types.APIRequest, schema *types.APISchema
}
}
var result []Partition
var result []partition.Partition
if attributes.Namespaced(schema) {
for k, v := range resources {
@ -105,120 +183,3 @@ func isPassthroughUnconstrained(apiOp *types.APIRequest, schema *types.APISchema
return result, false
}
func (r *RBACStore) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) {
partitions, passthrough := isPassthrough(apiOp, schema, "list")
if passthrough {
return r.Store.List(apiOp, schema)
}
resume := apiOp.Request.URL.Query().Get("continue")
limit := getLimit(apiOp.Request)
sort.Slice(partitions, func(i, j int) bool {
return partitions[i].Namespace < partitions[j].Namespace
})
lister := &ParallelPartitionLister{
Lister: func(ctx context.Context, partition Partition, cont string, revision string, limit int) (types.APIObjectList, error) {
return r.list(apiOp, schema, partition, cont, revision, limit)
},
Concurrency: 3,
Partitions: partitions,
}
result := types.APIObjectList{}
items, err := lister.List(apiOp.Context(), limit, resume)
if err != nil {
return result, err
}
for item := range items {
result.Objects = append(result.Objects, item...)
}
result.Continue = lister.Continue()
result.Revision = lister.Revision()
return result, lister.Err()
}
func getLimit(req *http.Request) int {
limitString := req.URL.Query().Get("limit")
limit, err := strconv.Atoi(limitString)
if err != nil {
limit = 0
}
if limit <= 0 {
limit = 100000
}
return limit
}
func (r *RBACStore) list(apiOp *types.APIRequest, schema *types.APISchema, partition Partition, cont, revision string, limit int) (types.APIObjectList, error) {
req := *apiOp
req.Namespace = partition.Namespace
req.Request = req.Request.Clone(apiOp.Context())
values := req.Request.URL.Query()
values.Set("continue", cont)
values.Set("revision", revision)
if limit > 0 {
values.Set("limit", strconv.Itoa(limit))
} else {
values.Del("limit")
}
req.Request.URL.RawQuery = values.Encode()
if partition.All {
return r.Store.List(&req, schema)
}
return r.Store.ByNames(&req, schema, partition.Names)
}
func (r *RBACStore) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan types.APIEvent, error) {
partitions, passthrough := isPassthrough(apiOp, schema, "watch")
if passthrough {
return r.Store.Watch(apiOp, schema, w)
}
ctx, cancel := context.WithCancel(apiOp.Context())
defer cancel()
apiOp = apiOp.WithContext(ctx)
eg := errgroup.Group{}
response := make(chan types.APIEvent)
for _, partition := range partitions {
partition := partition
eg.Go(func() error {
defer cancel()
var (
c chan types.APIEvent
err error
)
req := *apiOp
req.Namespace = partition.Namespace
if partition.All {
c, err = r.Store.Watch(&req, schema, w)
} else {
c, err = r.Store.WatchNames(&req, schema, w, partition.Names)
}
if err != nil {
return err
}
for i := range c {
response <- i
}
return nil
})
}
go func() {
defer close(response)
<-ctx.Done()
eg.Wait()
}()
return response, nil
}

View File

@ -0,0 +1,29 @@
package selector
import (
"github.com/rancher/steve/pkg/schemaserver/types"
"k8s.io/apimachinery/pkg/labels"
)
type Store struct {
types.Store
Selector labels.Selector
}
func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) {
return s.Store.List(s.addSelector(apiOp), schema)
}
func (s *Store) addSelector(apiOp *types.APIRequest) *types.APIRequest {
apiOp = apiOp.Clone()
apiOp.Request = apiOp.Request.Clone(apiOp.Context())
q := apiOp.Request.URL.Query()
q.Add("labelSelector", s.Selector.String())
apiOp.Request.URL.RawQuery = q.Encode()
return apiOp
}
func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan types.APIEvent, error) {
return s.Store.Watch(s.addSelector(apiOp), schema, w)
}

View File

@ -0,0 +1,61 @@
package switchschema
import (
"github.com/rancher/steve/pkg/schemaserver/types"
)
type Store struct {
Schema *types.APISchema
}
func (e *Store) Delete(apiOp *types.APIRequest, oldSchema *types.APISchema, id string) (types.APIObject, error) {
obj, err := e.Schema.Store.Delete(apiOp, e.Schema, id)
obj.Type = oldSchema.ID
return obj, err
}
func (e *Store) ByID(apiOp *types.APIRequest, oldSchema *types.APISchema, id string) (types.APIObject, error) {
obj, err := e.Schema.Store.ByID(apiOp, e.Schema, id)
obj.Type = oldSchema.ID
return obj, err
}
func (e *Store) List(apiOp *types.APIRequest, oldSchema *types.APISchema) (types.APIObjectList, error) {
obj, err := e.Schema.Store.List(apiOp, e.Schema)
for i := range obj.Objects {
obj.Objects[i].Type = oldSchema.ID
}
return obj, err
}
func (e *Store) Create(apiOp *types.APIRequest, oldSchema *types.APISchema, data types.APIObject) (types.APIObject, error) {
obj, err := e.Schema.Store.Create(apiOp, e.Schema, data)
obj.Type = oldSchema.ID
return obj, err
}
func (e *Store) Update(apiOp *types.APIRequest, oldSchema *types.APISchema, data types.APIObject, id string) (types.APIObject, error) {
obj, err := e.Schema.Store.Update(apiOp, e.Schema, data, id)
obj.Type = oldSchema.ID
return obj, err
}
func (e *Store) Watch(apiOp *types.APIRequest, oldSchema *types.APISchema, wr types.WatchRequest) (chan types.APIEvent, error) {
c, err := e.Schema.Store.Watch(apiOp, e.Schema, wr)
if err != nil || c == nil {
return c, err
}
result := make(chan types.APIEvent)
go func() {
defer close(result)
for obj := range c {
if obj.Object.Type == e.Schema.ID {
obj.Object.Type = oldSchema.ID
}
result <- obj
}
}()
return result, nil
}