diff --git a/pkg/server/resources/helm/formatter.go b/pkg/server/resources/helm/formatter.go new file mode 100644 index 0000000..29284fc --- /dev/null +++ b/pkg/server/resources/helm/formatter.go @@ -0,0 +1,58 @@ +package helm + +import ( + "github.com/rancher/steve/pkg/attributes" + "github.com/rancher/steve/pkg/schema/converter" + "github.com/rancher/steve/pkg/schemaserver/types" + "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +func DropHelmData(request *types.APIRequest, resource *types.RawResource) { + data := resource.APIObject.Data() + if data.String("metadata", "labels", "owner") == "helm" || + data.String("metadata", "labels", "OWNER") == "TILLER" { + if data.String("data", "release") != "" { + delete(data.Map("data"), "release") + } + } +} + +func FormatRelease(request *types.APIRequest, resource *types.RawResource) { + obj, ok := resource.APIObject.Object.(runtime.Object) + if !ok { + return + } + + release, err := ToRelease(obj, SchemeBasedNamespaceLookup(request.Schemas)) + if err == ErrNotHelmRelease { + return + } else if err != nil { + logrus.Errorf("failed to render helm release: %v", err) + return + } + + var ( + data = resource.APIObject.Data() + namespace = data.String("metadata", "namespace") + name = data.String("metadata", "name") + ) + + switch data.String("kind") { + case "Secret": + resource.ID = namespace + "/s:" + name + case "ConfigMap": + resource.ID = namespace + "/c:" + name + } + + resource.Links["self"] = request.URLBuilder.ResourceLink(request.Schema, resource.ID) + resource.APIObject.Object = release +} + +func SchemeBasedNamespaceLookup(schemas *types.APISchemas) IsNamespaced { + return func(gvk schema.GroupVersionKind) bool { + schema := schemas.LookupSchema(converter.GVKToSchemaID(gvk)) + return schema != nil && attributes.Namespaced(schema) + } +} diff --git a/pkg/server/resources/helm/helm2.go b/pkg/server/resources/helm/helm2.go new file mode 100644 index 0000000..4afdd44 --- /dev/null +++ b/pkg/server/resources/helm/helm2.go @@ -0,0 +1,158 @@ +package helm + +import ( + "bytes" + "compress/gzip" + "encoding/base64" + "io/ioutil" + "strings" + "time" + + "github.com/golang/protobuf/proto" + "github.com/golang/protobuf/ptypes/timestamp" + "github.com/sirupsen/logrus" + rspb "k8s.io/helm/pkg/proto/hapi/release" + "sigs.k8s.io/yaml" +) + +var ( + readmes = map[string]bool{ + "readme": true, + "readme.txt": true, + "readme.md": true, + } + statusMapping = map[string]Status{ + "UNKNOWN": StatusUnknown, + "DEPLOYED": StatusDeployed, + "DELETED": StatusUninstalled, + "SUPERSEDED": StatusSuperseded, + "FAILED": StatusFailed, + "DELETING": StatusUninstalling, + "PENDING_INSTALL": StatusPendingInstall, + "PENDING_UPGRADE": StatusPendingUpgrade, + "PENDING_ROLLBACK": StatusPendingRollback, + } +) + +func isHelm2(labels map[string]string) bool { + return labels["OWNER"] == "TILLER" +} + +func fromHelm2Data(data string, isNamespaced IsNamespaced) (*Release, error) { + release, err := decodeHelm2(data) + if err != nil { + return nil, err + } + + return fromHelm2ReleaseToRelease(release, isNamespaced) +} + +func toTime(t *timestamp.Timestamp) time.Time { + return time.Unix(t.GetSeconds(), int64(t.GetNanos())).UTC() +} + +func fromHelm2ReleaseToRelease(release *rspb.Release, isNamespaced IsNamespaced) (*Release, error) { + var ( + err error + ) + + hr := &Release{ + Name: release.Name, + Info: &Info{ + FirstDeployed: toTime(release.GetInfo().GetFirstDeployed()), + LastDeployed: toTime(release.GetInfo().GetLastDeployed()), + Deleted: toTime(release.GetInfo().GetDeleted()), + Description: release.GetInfo().GetDescription(), + Status: statusMapping[release.GetInfo().GetStatus().GetCode().String()], + Notes: release.GetInfo().GetStatus().GetNotes(), + }, + Chart: &Chart{ + Values: toMap(release.Namespace, release.Name, release.GetChart().GetValues().GetRaw()), + Metadata: &Metadata{ + Name: release.GetChart().GetMetadata().GetName(), + Home: release.GetChart().GetMetadata().GetHome(), + Sources: release.GetChart().GetMetadata().GetSources(), + Version: release.GetChart().GetMetadata().GetVersion(), + Description: release.GetChart().GetMetadata().GetDescription(), + Keywords: release.GetChart().GetMetadata().GetKeywords(), + Icon: release.GetChart().GetMetadata().GetIcon(), + Condition: release.GetChart().GetMetadata().GetCondition(), + Tags: release.GetChart().GetMetadata().GetTags(), + AppVersion: release.GetChart().GetMetadata().GetAppVersion(), + Deprecated: release.GetChart().GetMetadata().GetDeprecated(), + Annotations: release.GetChart().GetMetadata().GetAnnotations(), + KubeVersion: release.GetChart().GetMetadata().GetKubeVersion(), + }, + }, + Values: toMap(release.Namespace, release.Name, release.GetConfig().GetRaw()), + Version: int(release.Version), + Namespace: release.Namespace, + HelmMajorVersion: 3, + } + + for _, m := range release.GetChart().GetMetadata().GetMaintainers() { + if m == nil { + continue + } + hr.Chart.Metadata.Maintainers = append(hr.Chart.Metadata.Maintainers, Maintainer{ + Name: m.GetName(), + Email: m.GetEmail(), + URL: m.GetUrl(), + }) + } + + for _, f := range release.GetChart().GetFiles() { + if f == nil { + continue + } + if readmes[strings.ToLower(f.TypeUrl)] { + hr.Info.Readme = string(f.Value) + } + } + + hr.Resources, err = resourcesFromManifest(release.Namespace, release.Manifest, isNamespaced) + return hr, err +} + +func toMap(namespace, name string, manifest string) map[string]interface{} { + values := map[string]interface{}{} + + if manifest == "" { + return values + } + + if err := yaml.Unmarshal([]byte(manifest), &values); err != nil { + logrus.Errorf("failed to unmarshal yaml for %s/%s", namespace, name) + } + + return values +} + +func decodeHelm2(data string) (*rspb.Release, error) { + b, err := base64.StdEncoding.DecodeString(data) + if err != nil { + return nil, err + } + + // For backwards compatibility with releases that were stored before + // compression was introduced we skip decompression if the + // gzip magic header is not found + if bytes.Equal(b[0:3], magicGzip) { + r, err := gzip.NewReader(bytes.NewReader(b)) + if err != nil { + return nil, err + } + b2, err := ioutil.ReadAll(r) + if err != nil { + return nil, err + } + b = b2 + } + + var rls rspb.Release + // unmarshal protobuf bytes + if err := proto.Unmarshal(b, &rls); err != nil { + return nil, err + } + return &rls, nil +} diff --git a/pkg/server/resources/helm/helm3.go b/pkg/server/resources/helm/helm3.go new file mode 100644 index 0000000..b056a22 --- /dev/null +++ b/pkg/server/resources/helm/helm3.go @@ -0,0 +1,132 @@ +package helm + +import ( + "bytes" + "compress/gzip" + "encoding/base64" + "encoding/json" + "io/ioutil" + "strings" + + "helm.sh/helm/v3/pkg/release" +) + +func isHelm3(labels map[string]string) bool { + return labels["owner"] == "helm" +} + +func fromHelm3Data(data string, isNamespaced IsNamespaced) (*Release, error) { + release, err := decodeHelm3(data) + if err != nil { + return nil, err + } + + return fromHelm3ReleaseToRelease(release, isNamespaced) +} + +func fromHelm3ReleaseToRelease(release *release.Release, isNamespaced IsNamespaced) (*Release, error) { + var ( + info = &Info{} + chart = &Chart{} + err error + ) + + if release.Info != nil { + info = &Info{ + FirstDeployed: release.Info.FirstDeployed.Time, + LastDeployed: release.Info.LastDeployed.Time, + Deleted: release.Info.Deleted.Time, + Description: release.Info.Description, + Status: Status(release.Info.Status), + Notes: release.Info.Notes, + } + } + + if release.Chart != nil { + chart = &Chart{ + Values: release.Chart.Values, + } + if release.Chart.Metadata != nil { + chart.Metadata = &Metadata{ + Name: release.Chart.Metadata.Name, + Home: release.Chart.Metadata.Home, + Sources: release.Chart.Metadata.Sources, + Version: release.Chart.Metadata.Version, + Description: release.Chart.Metadata.Description, + Keywords: release.Chart.Metadata.Keywords, + Icon: release.Chart.Metadata.Icon, + APIVersion: release.Chart.Metadata.APIVersion, + Condition: release.Chart.Metadata.Condition, + Tags: release.Chart.Metadata.Tags, + AppVersion: release.Chart.Metadata.AppVersion, + Deprecated: release.Chart.Metadata.Deprecated, + Annotations: release.Chart.Metadata.Annotations, + KubeVersion: release.Chart.Metadata.KubeVersion, + Type: release.Chart.Metadata.Type, + } + + for _, m := range release.Chart.Metadata.Maintainers { + if m == nil { + continue + } + chart.Metadata.Maintainers = append(chart.Metadata.Maintainers, Maintainer{ + Name: m.Name, + Email: m.Email, + URL: m.URL, + }) + } + } + + for _, f := range release.Chart.Files { + if f == nil { + continue + } + if readmes[strings.ToLower(f.Name)] { + info.Readme = string(f.Data) + } + } + } + + hr := &Release{ + Name: release.Name, + Info: info, + Chart: chart, + Values: release.Config, + Resources: nil, + Version: release.Version, + Namespace: release.Namespace, + HelmMajorVersion: 3, + } + + hr.Resources, err = resourcesFromManifest(release.Namespace, release.Manifest, isNamespaced) + return hr, err +} + +func decodeHelm3(data string) (*release.Release, error) { + b, err := base64.StdEncoding.DecodeString(data) + if err != nil { + return nil, err + } + + // For backwards compatibility with releases that were stored before + // compression was introduced we skip decompression if the + // gzip magic header is not found + if bytes.Equal(b[0:3], magicGzip) { + r, err := gzip.NewReader(bytes.NewReader(b)) + if err != nil { + return nil, err + } + b2, err := ioutil.ReadAll(r) + if err != nil { + return nil, err + } + b = b2 + } + + var rls release.Release + // unmarshal release object bytes + if err := json.Unmarshal(b, &rls); err != nil { + return nil, err + } + return &rls, nil +} diff --git a/pkg/server/resources/helm/register.go b/pkg/server/resources/helm/register.go new file mode 100644 index 0000000..b3af89b --- /dev/null +++ b/pkg/server/resources/helm/register.go @@ -0,0 +1,20 @@ +package helm + +import ( + "net/http" + + "github.com/rancher/steve/pkg/schemaserver/types" + "github.com/rancher/steve/pkg/server/store/partition" +) + +func Register(schemas *types.APISchemas) { + schemas.InternalSchemas.TypeName("helmrelease", Release{}) + schemas.MustImportAndCustomize(Release{}, func(schema *types.APISchema) { + schema.CollectionMethods = []string{http.MethodGet} + schema.ResourceMethods = []string{http.MethodGet} + schema.Store = &partition.Store{ + Partitioner: &partitioner{}, + } + schema.Formatter = FormatRelease + }) +} diff --git a/pkg/server/resources/helm/release.go b/pkg/server/resources/helm/release.go new file mode 100644 index 0000000..39a0e3f --- /dev/null +++ b/pkg/server/resources/helm/release.go @@ -0,0 +1,93 @@ +package helm + +import ( + "bytes" + "encoding/base64" + "errors" + + "github.com/rancher/wrangler/pkg/data" + "github.com/rancher/wrangler/pkg/yaml" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + meta2 "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +var ( + ErrNotHelmRelease = errors.New("not helm release") + magicGzip = []byte{0x1f, 0x8b, 0x08} +) + +type IsNamespaced func(gvk schema.GroupVersionKind) bool + +func ToRelease(obj runtime.Object, isNamespaced IsNamespaced) (*Release, error) { + releaseData, err := getReleaseDataAndKind(obj) + if err != nil { + return nil, err + } + + meta, err := meta.Accessor(obj) + if err != nil { + return nil, err + } + + switch { + case isHelm3(meta.GetLabels()): + return fromHelm3Data(releaseData, isNamespaced) + case isHelm2(meta.GetLabels()): + return fromHelm2Data(releaseData, isNamespaced) + } + + return nil, ErrNotHelmRelease +} + +func getReleaseDataAndKind(obj runtime.Object) (string, error) { + switch t := obj.(type) { + case *unstructured.Unstructured: + releaseData := data.Object(t.Object).String("data", "release") + switch t.GetKind() { + case "ConfigMap": + return releaseData, nil + case "Secret": + data, err := base64.StdEncoding.DecodeString(releaseData) + if err != nil { + return "", err + } + return string(data), nil + } + case *corev1.ConfigMap: + return t.Data["release"], nil + case *corev1.Secret: + return string(t.Data["release"]), nil + } + + return "", ErrNotHelmRelease +} + +func resourcesFromManifest(namespace string, manifest string, isNamespaced IsNamespaced) (result []Resource, err error) { + objs, err := yaml.ToObjects(bytes.NewReader([]byte(manifest))) + if err != nil { + return nil, err + } + + for _, obj := range objs { + meta, err := meta2.Accessor(obj) + if err != nil { + return nil, err + } + r := Resource{ + Name: meta.GetName(), + Namespace: meta.GetNamespace(), + } + gvk := obj.GetObjectKind().GroupVersionKind() + if isNamespaced != nil && isNamespaced(gvk) && r.Namespace == "" { + r.Namespace = namespace + } + r.APIVersion, r.Kind = gvk.ToAPIVersionAndKind() + result = append(result, r) + } + + return result, nil +} diff --git a/pkg/server/resources/helm/store.go b/pkg/server/resources/helm/store.go new file mode 100644 index 0000000..d738088 --- /dev/null +++ b/pkg/server/resources/helm/store.go @@ -0,0 +1,104 @@ +package helm + +import ( + "strings" + + "github.com/rancher/steve/pkg/schemaserver/types" + "github.com/rancher/steve/pkg/server/store/partition" + "github.com/rancher/steve/pkg/server/store/selector" + "github.com/rancher/steve/pkg/server/store/switchschema" + "github.com/rancher/wrangler/pkg/schemas/validation" + "k8s.io/apimachinery/pkg/labels" +) + +var ( + configMap2 = target{ + schemaType: "configmap", + version: "2", + selector: labels.SelectorFromSet(labels.Set{ + "OWNER": "TILLER", + }), + } + secret2 = target{ + schemaType: "secret", + version: "2", + selector: labels.SelectorFromSet(labels.Set{ + "OWNER": "TILLER", + }), + } + secret3 = target{ + schemaType: "secret", + version: "3", + selector: labels.SelectorFromSet(labels.Set{ + "owner": "helm", + }), + } + all = []partition.Partition{ + configMap2, + secret2, + secret3, + } +) + +type target struct { + schemaType string + version string + selector labels.Selector +} + +func (t target) Name() string { + return t.schemaType + t.version +} + +type partitioner struct { +} + +func (p *partitioner) Lookup(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) (partition.Partition, error) { + if id == "" { + return nil, validation.Unauthorized + } + t := strings.SplitN(id, ":", 2)[0] + if t == "c" { + return configMap2, nil + } else if t == "s" { + return secret2, nil + } + return nil, validation.NotFound +} + +func (p *partitioner) All(apiOp *types.APIRequest, schema *types.APISchema, verb string) ([]partition.Partition, error) { + return all, nil +} + +func (p *partitioner) Store(apiOp *types.APIRequest, partition partition.Partition) (types.Store, error) { + target := partition.(target) + schema := apiOp.Schemas.LookupSchema(target.schemaType) + return &stripIDPrefix{ + Store: &selector.Store{ + Selector: target.selector, + Store: &switchschema.Store{ + Schema: schema, + }, + }, + }, nil +} + +type stripIDPrefix struct { + types.Store +} + +func stripPrefix(s string) string { + return strings.TrimPrefix(strings.TrimPrefix(s, "c:"), "s:") +} + +func (s *stripIDPrefix) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) { + return s.Store.Delete(apiOp, schema, stripPrefix(id)) +} + +func (s *stripIDPrefix) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) { + return s.Store.ByID(apiOp, schema, stripPrefix(id)) +} + +func (s *stripIDPrefix) Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (types.APIObject, error) { + return s.Store.Update(apiOp, schema, data, stripPrefix(id)) +} diff --git a/pkg/server/resources/helm/types.go b/pkg/server/resources/helm/types.go new file mode 100644 index 0000000..a915160 --- /dev/null +++ b/pkg/server/resources/helm/types.go @@ -0,0 +1,130 @@ +package helm + +import ( + "time" +) + +type Release struct { + // Name is the name of the release + Name string `json:"name,omitempty"` + // Info provides information about a release + Info *Info `json:"info,omitempty"` + // Chart is the chart that was released. + Chart *Chart `json:"chart,omitempty"` + // Config is the set of extra Values added to the chart. + // These values override the default values inside of the chart. + Values map[string]interface{} `json:"values,omitempty"` + // Manifest is the string representation of the rendered template. + Resources []Resource `json:"resources,omitempty"` + // Version is an int which represents the version of the release. + Version int `json:"version,omitempty"` + // Namespace is the kubernetes namespace of the release. + Namespace string `json:"namespace,omitempty"` + + HelmMajorVersion int `json:"helmVersion,omitempty"` +} + +type Resource struct { + APIVersion string `json:"apiVersion"` + Kind string `json:"kind"` + Name string `json:"name"` + Namespace string `json:"namespace"` +} + +// Chart is a helm package that contains metadata, a default config, zero or more +// optionally parameterizable templates, and zero or more charts (dependencies). +type Chart struct { + // Metadata is the contents of the Chartfile. + Metadata *Metadata `json:"metadata"` + // Values are default config for this chart. + Values map[string]interface{} `json:"values"` +} + +// Metadata for a Chart file. This models the structure of a Chart.yaml file. +type Metadata struct { + // The name of the chart + Name string `json:"name,omitempty"` + // The URL to a relevant project page, git repo, or contact person + Home string `json:"home,omitempty"` + // Source is the URL to the source code of this chart + Sources []string `json:"sources,omitempty"` + // A SemVer 2 conformant version string of the chart + Version string `json:"version,omitempty"` + // A one-sentence description of the chart + Description string `json:"description,omitempty"` + // A list of string keywords + Keywords []string `json:"keywords,omitempty"` + // A list of name and URL/email address combinations for the maintainer(s) + Maintainers []Maintainer `json:"maintainers,omitempty"` + // The URL to an icon file. + Icon string `json:"icon,omitempty"` + // The API Version of this chart. + APIVersion string `json:"apiVersion,omitempty"` + // The condition to check to enable chart + Condition string `json:"condition,omitempty"` + // The tags to check to enable chart + Tags string `json:"tags,omitempty"` + // The version of the application enclosed inside of this chart. + AppVersion string `json:"appVersion,omitempty"` + // Whether or not this chart is deprecated + Deprecated bool `json:"deprecated,omitempty"` + // Annotations are additional mappings uninterpreted by Helm, + // made available for inspection by other applications. + Annotations map[string]string `json:"annotations,omitempty"` + // KubeVersion is a SemVer constraint specifying the version of Kubernetes required. + KubeVersion string `json:"kubeVersion,omitempty"` + // Specifies the chart type: application or library + Type string `json:"type,omitempty"` +} + +// Maintainer describes a Chart maintainer. +type Maintainer struct { + // Name is a user name or organization name + Name string `json:"name,omitempty"` + // Email is an optional email address to contact the named maintainer + Email string `json:"email,omitempty"` + // URL is an optional URL to an address for the named maintainer + URL string `json:"url,omitempty"` +} + +// Info describes release information. +type Info struct { + // FirstDeployed is when the release was first deployed. + FirstDeployed time.Time `json:"firstDeployed,omitempty"` + // LastDeployed is when the release was last deployed. + LastDeployed time.Time `json:"lastDeployed,omitempty"` + // Deleted tracks when this object was deleted. + Deleted time.Time `json:"deleted"` + // Description is human-friendly "log entry" about this release. + Description string `json:"description,omitempty"` + // Status is the current state of the release + Status Status `json:"status,omitempty" wrangler:"options=unknown|deployed|uninstalled|superseded|failed|uninstalling|pending-install|pending-upgrade|pending-rollback"` + // Contains the rendered templates/NOTES.txt if available + Notes string `json:"notes,omitempty"` + Readme string `json:"readme,omitempty"` +} + +type Status string + +// Describe the status of a release +// NOTE: Make sure to update cmd/helm/status.go when adding or modifying any of these statuses. +const ( + // StatusUnknown indicates that a release is in an uncertain state. + StatusUnknown Status = "unknown" + // StatusDeployed indicates that the release has been pushed to Kubernetes. + StatusDeployed Status = "deployed" + // StatusUninstalled indicates that a release has been uninstalled from Kubernetes. + StatusUninstalled Status = "uninstalled" + // StatusSuperseded indicates that this release object is outdated and a newer one exists. + StatusSuperseded Status = "superseded" + // StatusFailed indicates that the release was not successfully deployed. + StatusFailed Status = "failed" + // StatusUninstalling indicates that a uninstall operation is underway. + StatusUninstalling Status = "uninstalling" + // StatusPendingInstall indicates that an install operation is underway. + StatusPendingInstall Status = "pending-install" + // StatusPendingUpgrade indicates that an upgrade operation is underway. + StatusPendingUpgrade Status = "pending-upgrade" + // StatusPendingRollback indicates that an rollback operation is underway. + StatusPendingRollback Status = "pending-rollback" +) diff --git a/pkg/server/resources/schema.go b/pkg/server/resources/schema.go index 075655a..6edb84e 100644 --- a/pkg/server/resources/schema.go +++ b/pkg/server/resources/schema.go @@ -14,6 +14,7 @@ import ( "github.com/rancher/steve/pkg/server/resources/clusters" "github.com/rancher/steve/pkg/server/resources/common" "github.com/rancher/steve/pkg/server/resources/counts" + "github.com/rancher/steve/pkg/server/resources/helm" "github.com/rancher/steve/pkg/server/resources/userpreferences" "github.com/rancher/steve/pkg/server/store/proxy" "k8s.io/client-go/discovery" @@ -25,6 +26,7 @@ func DefaultSchemas(ctx context.Context, baseSchema *types.APISchemas, ccache cl apiroot.Register(baseSchema, []string{"v1"}, []string{"proxy:/apis"}) userpreferences.Register(baseSchema, cg) clusters.Register(ctx, baseSchema, cg, ccache) + helm.Register(baseSchema) return baseSchema } @@ -32,5 +34,13 @@ func DefaultSchemaTemplates(cf *client.Factory, lookup accesscontrol.AccessSetLo return []schema.Template{ common.DefaultTemplate(cf, lookup), apigroups.Template(discovery), + { + ID: "configmap", + Formatter: helm.DropHelmData, + }, + { + ID: "secret", + Formatter: helm.DropHelmData, + }, } } diff --git a/pkg/server/store/proxy/con_eg.go b/pkg/server/store/partition/parallel.go similarity index 80% rename from pkg/server/store/proxy/con_eg.go rename to pkg/server/store/partition/parallel.go index 56ac425..bbf1ea5 100644 --- a/pkg/server/store/proxy/con_eg.go +++ b/pkg/server/store/partition/parallel.go @@ -1,4 +1,4 @@ -package proxy +package partition import ( "context" @@ -10,6 +10,10 @@ import ( "golang.org/x/sync/semaphore" ) +type Partition interface { + Name() string +} + type ParallelPartitionLister struct { Lister PartitionLister Concurrency int64 @@ -40,12 +44,12 @@ func (p *ParallelPartitionLister) Continue() string { return base64.StdEncoding.EncodeToString(bytes) } -func indexOrZero(partitions []Partition, namespace string) int { - if namespace == "" { +func indexOrZero(partitions []Partition, name string) int { + if name == "" { return 0 } for i, partition := range partitions { - if partition.Namespace == namespace { + if partition.Name() == name { return i } } @@ -74,11 +78,11 @@ func (p *ParallelPartitionLister) List(ctx context.Context, limit int, resume st } type listState struct { - Revision string `json:"r,omitempty"` - PartitionNamespace string `json:"p,omitempty"` - Continue string `json:"c,omitempty"` - Offset int `json:"o,omitempty"` - Limit int `json:"l,omitempty"` + Revision string `json:"r,omitempty"` + PartitionName string `json:"p,omitempty"` + Continue string `json:"c,omitempty"` + Offset int `json:"o,omitempty"` + Limit int `json:"l,omitempty"` } func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, limit int, result chan []types.APIObject) { @@ -97,7 +101,7 @@ func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, l close(result) }() - for i := indexOrZero(p.Partitions, state.PartitionNamespace); i < len(p.Partitions); i++ { + for i := indexOrZero(p.Partitions, state.PartitionName); i < len(p.Partitions); i++ { if capacity <= 0 || isDone(ctx) { break } @@ -129,7 +133,7 @@ func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, l for { cont := "" - if partition.Namespace == state.PartitionNamespace { + if partition.Name() == state.PartitionName { cont = state.Continue } list, err := p.Lister(ctx, partition, cont, state.Revision, limit) @@ -150,7 +154,7 @@ func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, l p.revision = list.Revision } - if state.PartitionNamespace == partition.Namespace && state.Offset > 0 && state.Offset < len(list.Objects) { + if state.PartitionName == partition.Name() && state.Offset > 0 && state.Offset < len(list.Objects) { list.Objects = list.Objects[state.Offset:] } @@ -158,11 +162,11 @@ func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, l result <- list.Objects[:capacity] // save state to redo this list at this offset p.state = &listState{ - Revision: list.Revision, - PartitionNamespace: partition.Namespace, - Continue: cont, - Offset: capacity, - Limit: limit, + Revision: list.Revision, + PartitionName: partition.Name(), + Continue: cont, + Offset: capacity, + Limit: limit, } capacity = 0 return nil @@ -174,7 +178,7 @@ func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, l } // loop again and get more data state.Continue = list.Continue - state.PartitionNamespace = partition.Namespace + state.PartitionName = partition.Name() state.Offset = 0 } } diff --git a/pkg/server/store/partition/store.go b/pkg/server/store/partition/store.go new file mode 100644 index 0000000..fd9220e --- /dev/null +++ b/pkg/server/store/partition/store.go @@ -0,0 +1,176 @@ +package partition + +import ( + "context" + "net/http" + "strconv" + + "github.com/rancher/steve/pkg/schemaserver/types" + "golang.org/x/sync/errgroup" +) + +type Partitioner interface { + Lookup(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) (Partition, error) + All(apiOp *types.APIRequest, schema *types.APISchema, verb string) ([]Partition, error) + Store(apiOp *types.APIRequest, partition Partition) (types.Store, error) +} + +type Store struct { + Partitioner Partitioner +} + +func (s *Store) getStore(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) (types.Store, error) { + p, err := s.Partitioner.Lookup(apiOp, schema, verb, id) + if err != nil { + return nil, err + } + + return s.Partitioner.Store(apiOp, p) +} + +func (s *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) { + target, err := s.getStore(apiOp, schema, "delete", id) + if err != nil { + return types.APIObject{}, err + } + + return target.Delete(apiOp, schema, id) +} + +func (s *Store) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) { + target, err := s.getStore(apiOp, schema, "get", id) + if err != nil { + return types.APIObject{}, err + } + + return target.ByID(apiOp, schema, id) +} + +func (s *Store) listPartition(ctx context.Context, apiOp *types.APIRequest, schema *types.APISchema, partition Partition, + cont string, revision string, limit int) (types.APIObjectList, error) { + store, err := s.Partitioner.Store(apiOp, partition) + if err != nil { + return types.APIObjectList{}, err + } + + req := apiOp.Clone() + req.Request = req.Request.Clone(ctx) + + values := req.Request.URL.Query() + values.Set("continue", cont) + values.Set("revision", revision) + if limit > 0 { + values.Set("limit", strconv.Itoa(limit)) + } else { + values.Del("limit") + } + req.Request.URL.RawQuery = values.Encode() + + return store.List(req, schema) +} + +func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) { + var ( + result types.APIObjectList + ) + + paritions, err := s.Partitioner.All(apiOp, schema, "list") + if err != nil { + return result, err + } + + lister := ParallelPartitionLister{ + Lister: func(ctx context.Context, partition Partition, cont string, revision string, limit int) (types.APIObjectList, error) { + return s.listPartition(ctx, apiOp, schema, partition, cont, revision, limit) + }, + Concurrency: 3, + Partitions: paritions, + } + + resume := apiOp.Request.URL.Query().Get("continue") + limit := getLimit(apiOp.Request) + + list, err := lister.List(apiOp.Context(), limit, resume) + if err != nil { + return result, err + } + + for items := range list { + result.Objects = append(result.Objects, items...) + } + + result.Revision = lister.Revision() + result.Continue = lister.Continue() + return result, nil +} + +func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (types.APIObject, error) { + target, err := s.getStore(apiOp, schema, "create", "") + if err != nil { + return types.APIObject{}, err + } + + return target.Create(apiOp, schema, data) +} + +func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (types.APIObject, error) { + target, err := s.getStore(apiOp, schema, "update", id) + if err != nil { + return types.APIObject{}, err + } + + return target.Update(apiOp, schema, data, id) +} + +func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types.WatchRequest) (chan types.APIEvent, error) { + partitions, err := s.Partitioner.All(apiOp, schema, "watch") + if err != nil { + return nil, err + } + + ctx, cancel := context.WithCancel(apiOp.Context()) + defer cancel() + apiOp = apiOp.WithContext(ctx) + + eg := errgroup.Group{} + response := make(chan types.APIEvent) + + for _, partition := range partitions { + store, err := s.Partitioner.Store(apiOp, partition) + if err != nil { + return nil, err + } + + eg.Go(func() error { + defer cancel() + c, err := store.Watch(apiOp, schema, wr) + if err != nil { + return err + } + for i := range c { + response <- i + } + return nil + }) + } + + go func() { + defer close(response) + <-ctx.Done() + eg.Wait() + }() + + return response, nil +} + +func getLimit(req *http.Request) int { + limitString := req.URL.Query().Get("limit") + limit, err := strconv.Atoi(limitString) + if err != nil { + limit = 0 + } + if limit <= 0 { + limit = 100000 + } + return limit +} diff --git a/pkg/server/store/proxy/proxy_store.go b/pkg/server/store/proxy/proxy_store.go index 1344d26..02e188c 100644 --- a/pkg/server/store/proxy/proxy_store.go +++ b/pkg/server/store/proxy/proxy_store.go @@ -13,6 +13,7 @@ import ( "github.com/rancher/steve/pkg/accesscontrol" "github.com/rancher/steve/pkg/attributes" "github.com/rancher/steve/pkg/schemaserver/types" + "github.com/rancher/steve/pkg/server/store/partition" "github.com/rancher/wrangler/pkg/data" "github.com/rancher/wrangler/pkg/schemas/validation" "github.com/sirupsen/logrus" @@ -50,9 +51,11 @@ type Store struct { func NewProxyStore(clientGetter ClientGetter, lookup accesscontrol.AccessSetLookup) types.Store { return &errorStore{ Store: &WatchRefresh{ - Store: &RBACStore{ - Store: &Store{ - clientGetter: clientGetter, + Store: &partition.Store{ + Partitioner: &rbacPartitioner{ + proxyStore: &Store{ + clientGetter: clientGetter, + }, }, }, asl: lookup, diff --git a/pkg/server/store/proxy/rbac_store.go b/pkg/server/store/proxy/rbac_store.go index 65ebfda..4878070 100644 --- a/pkg/server/store/proxy/rbac_store.go +++ b/pkg/server/store/proxy/rbac_store.go @@ -2,17 +2,23 @@ package proxy import ( "context" + "fmt" "net/http" "sort" - "strconv" "github.com/rancher/steve/pkg/accesscontrol" "github.com/rancher/steve/pkg/attributes" "github.com/rancher/steve/pkg/schemaserver/types" - "golang.org/x/sync/errgroup" + "github.com/rancher/steve/pkg/server/store/partition" "k8s.io/apimachinery/pkg/util/sets" ) +var ( + passthroughPartitions = []partition.Partition{ + Partition{Passthrough: true}, + } +) + type filterKey struct{} func AddNamespaceConstraint(req *http.Request, names ...string) *http.Request { @@ -26,24 +32,96 @@ func getNamespaceConstraint(req *http.Request) (sets.String, bool) { return set, ok } -type RBACStore struct { - *Store -} - type Partition struct { - Namespace string - All bool - Names sets.String + Namespace string + All bool + Passthrough bool + Names sets.String } -func isPassthrough(apiOp *types.APIRequest, schema *types.APISchema, verb string) ([]Partition, bool) { +func (p Partition) Name() string { + return p.Namespace +} + +type rbacPartitioner struct { + proxyStore *Store +} + +func (p *rbacPartitioner) Lookup(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) (partition.Partition, error) { + switch verb { + case "get": + fallthrough + case "update": + fallthrough + case "delete": + return passthroughPartitions[0], nil + default: + return nil, fmt.Errorf("invalid verb %s", verb) + } +} + +func (p *rbacPartitioner) All(apiOp *types.APIRequest, schema *types.APISchema, verb string) ([]partition.Partition, error) { + switch verb { + case "list": + fallthrough + case "watch": + partitions, passthrough := isPassthrough(apiOp, schema, verb) + if passthrough { + return passthroughPartitions, nil + } + sort.Slice(partitions, func(i, j int) bool { + return partitions[i].(Partition).Namespace < partitions[j].(Partition).Namespace + }) + return partitions, nil + default: + return nil, fmt.Errorf("invalid verb %s", verb) + } +} + +func (p *rbacPartitioner) Store(apiOp *types.APIRequest, partition partition.Partition) (types.Store, error) { + return &byNameOrNamespaceStore{ + Store: p.proxyStore, + partition: partition.(Partition), + }, nil +} + +type byNameOrNamespaceStore struct { + *Store + partition Partition +} + +func (b *byNameOrNamespaceStore) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) { + if b.partition.Passthrough { + return b.Store.List(apiOp, schema) + } + + apiOp.Namespace = b.partition.Namespace + if b.partition.All { + return b.Store.List(apiOp, schema) + } + return b.Store.ByNames(apiOp, schema, b.partition.Names) +} + +func (b *byNameOrNamespaceStore) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types.WatchRequest) (chan types.APIEvent, error) { + if b.partition.Passthrough { + return b.Store.Watch(apiOp, schema, wr) + } + + apiOp.Namespace = b.partition.Namespace + if b.partition.All { + return b.Store.Watch(apiOp, schema, wr) + } + return b.Store.WatchNames(apiOp, schema, wr, b.partition.Names) +} + +func isPassthrough(apiOp *types.APIRequest, schema *types.APISchema, verb string) ([]partition.Partition, bool) { partitions, passthrough := isPassthroughUnconstrained(apiOp, schema, verb) namespaces, ok := getNamespaceConstraint(apiOp.Request) if !ok { return partitions, passthrough } - var result []Partition + var result []partition.Partition if passthrough { for namespace := range namespaces { @@ -56,7 +134,7 @@ func isPassthrough(apiOp *types.APIRequest, schema *types.APISchema, verb string } for _, partition := range partitions { - if namespaces.Has(partition.Namespace) { + if namespaces.Has(partition.Name()) { result = append(result, partition) } } @@ -64,7 +142,7 @@ func isPassthrough(apiOp *types.APIRequest, schema *types.APISchema, verb string return result, false } -func isPassthroughUnconstrained(apiOp *types.APIRequest, schema *types.APISchema, verb string) ([]Partition, bool) { +func isPassthroughUnconstrained(apiOp *types.APIRequest, schema *types.APISchema, verb string) ([]partition.Partition, bool) { accessListByVerb, _ := attributes.Access(schema).(accesscontrol.AccessListByVerb) if accessListByVerb.All(verb) { return nil, true @@ -75,8 +153,8 @@ func isPassthroughUnconstrained(apiOp *types.APIRequest, schema *types.APISchema if resources[apiOp.Namespace].All { return nil, true } else { - return []Partition{ - { + return []partition.Partition{ + Partition{ Namespace: apiOp.Namespace, Names: resources[apiOp.Namespace].Names, }, @@ -84,7 +162,7 @@ func isPassthroughUnconstrained(apiOp *types.APIRequest, schema *types.APISchema } } - var result []Partition + var result []partition.Partition if attributes.Namespaced(schema) { for k, v := range resources { @@ -105,120 +183,3 @@ func isPassthroughUnconstrained(apiOp *types.APIRequest, schema *types.APISchema return result, false } - -func (r *RBACStore) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) { - partitions, passthrough := isPassthrough(apiOp, schema, "list") - if passthrough { - return r.Store.List(apiOp, schema) - } - - resume := apiOp.Request.URL.Query().Get("continue") - limit := getLimit(apiOp.Request) - - sort.Slice(partitions, func(i, j int) bool { - return partitions[i].Namespace < partitions[j].Namespace - }) - - lister := &ParallelPartitionLister{ - Lister: func(ctx context.Context, partition Partition, cont string, revision string, limit int) (types.APIObjectList, error) { - return r.list(apiOp, schema, partition, cont, revision, limit) - }, - Concurrency: 3, - Partitions: partitions, - } - - result := types.APIObjectList{} - items, err := lister.List(apiOp.Context(), limit, resume) - if err != nil { - return result, err - } - - for item := range items { - result.Objects = append(result.Objects, item...) - } - - result.Continue = lister.Continue() - result.Revision = lister.Revision() - return result, lister.Err() -} - -func getLimit(req *http.Request) int { - limitString := req.URL.Query().Get("limit") - limit, err := strconv.Atoi(limitString) - if err != nil { - limit = 0 - } - if limit <= 0 { - limit = 100000 - } - return limit -} - -func (r *RBACStore) list(apiOp *types.APIRequest, schema *types.APISchema, partition Partition, cont, revision string, limit int) (types.APIObjectList, error) { - req := *apiOp - req.Namespace = partition.Namespace - req.Request = req.Request.Clone(apiOp.Context()) - - values := req.Request.URL.Query() - values.Set("continue", cont) - values.Set("revision", revision) - if limit > 0 { - values.Set("limit", strconv.Itoa(limit)) - } else { - values.Del("limit") - } - req.Request.URL.RawQuery = values.Encode() - - if partition.All { - return r.Store.List(&req, schema) - } - return r.Store.ByNames(&req, schema, partition.Names) -} - -func (r *RBACStore) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan types.APIEvent, error) { - partitions, passthrough := isPassthrough(apiOp, schema, "watch") - if passthrough { - return r.Store.Watch(apiOp, schema, w) - } - - ctx, cancel := context.WithCancel(apiOp.Context()) - defer cancel() - apiOp = apiOp.WithContext(ctx) - - eg := errgroup.Group{} - response := make(chan types.APIEvent) - for _, partition := range partitions { - partition := partition - eg.Go(func() error { - defer cancel() - - var ( - c chan types.APIEvent - err error - ) - - req := *apiOp - req.Namespace = partition.Namespace - if partition.All { - c, err = r.Store.Watch(&req, schema, w) - } else { - c, err = r.Store.WatchNames(&req, schema, w, partition.Names) - } - if err != nil { - return err - } - for i := range c { - response <- i - } - return nil - }) - } - - go func() { - defer close(response) - <-ctx.Done() - eg.Wait() - }() - - return response, nil -} diff --git a/pkg/server/store/selector/selector.go b/pkg/server/store/selector/selector.go new file mode 100644 index 0000000..13ffd99 --- /dev/null +++ b/pkg/server/store/selector/selector.go @@ -0,0 +1,29 @@ +package selector + +import ( + "github.com/rancher/steve/pkg/schemaserver/types" + "k8s.io/apimachinery/pkg/labels" +) + +type Store struct { + types.Store + Selector labels.Selector +} + +func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) { + return s.Store.List(s.addSelector(apiOp), schema) +} + +func (s *Store) addSelector(apiOp *types.APIRequest) *types.APIRequest { + + apiOp = apiOp.Clone() + apiOp.Request = apiOp.Request.Clone(apiOp.Context()) + q := apiOp.Request.URL.Query() + q.Add("labelSelector", s.Selector.String()) + apiOp.Request.URL.RawQuery = q.Encode() + return apiOp +} + +func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan types.APIEvent, error) { + return s.Store.Watch(s.addSelector(apiOp), schema, w) +} diff --git a/pkg/server/store/switchschema/store.go b/pkg/server/store/switchschema/store.go new file mode 100644 index 0000000..f935f35 --- /dev/null +++ b/pkg/server/store/switchschema/store.go @@ -0,0 +1,61 @@ +package switchschema + +import ( + "github.com/rancher/steve/pkg/schemaserver/types" +) + +type Store struct { + Schema *types.APISchema +} + +func (e *Store) Delete(apiOp *types.APIRequest, oldSchema *types.APISchema, id string) (types.APIObject, error) { + obj, err := e.Schema.Store.Delete(apiOp, e.Schema, id) + obj.Type = oldSchema.ID + return obj, err +} + +func (e *Store) ByID(apiOp *types.APIRequest, oldSchema *types.APISchema, id string) (types.APIObject, error) { + obj, err := e.Schema.Store.ByID(apiOp, e.Schema, id) + obj.Type = oldSchema.ID + return obj, err +} + +func (e *Store) List(apiOp *types.APIRequest, oldSchema *types.APISchema) (types.APIObjectList, error) { + obj, err := e.Schema.Store.List(apiOp, e.Schema) + for i := range obj.Objects { + obj.Objects[i].Type = oldSchema.ID + } + return obj, err +} + +func (e *Store) Create(apiOp *types.APIRequest, oldSchema *types.APISchema, data types.APIObject) (types.APIObject, error) { + obj, err := e.Schema.Store.Create(apiOp, e.Schema, data) + obj.Type = oldSchema.ID + return obj, err +} + +func (e *Store) Update(apiOp *types.APIRequest, oldSchema *types.APISchema, data types.APIObject, id string) (types.APIObject, error) { + obj, err := e.Schema.Store.Update(apiOp, e.Schema, data, id) + obj.Type = oldSchema.ID + return obj, err +} + +func (e *Store) Watch(apiOp *types.APIRequest, oldSchema *types.APISchema, wr types.WatchRequest) (chan types.APIEvent, error) { + c, err := e.Schema.Store.Watch(apiOp, e.Schema, wr) + if err != nil || c == nil { + return c, err + } + + result := make(chan types.APIEvent) + go func() { + defer close(result) + for obj := range c { + if obj.Object.Type == e.Schema.ID { + obj.Object.Type = oldSchema.ID + } + result <- obj + } + }() + + return result, nil +}