1
0
mirror of https://github.com/rancher/norman.git synced 2025-07-11 14:24:07 +00:00
norman/store/proxy/proxy_store.go

284 lines
7.3 KiB
Go
Raw Normal View History

2017-11-11 04:44:02 +00:00
package proxy
import (
2017-11-28 21:28:25 +00:00
ejson "encoding/json"
2017-11-11 04:44:02 +00:00
"strings"
2017-12-18 20:56:50 +00:00
"net/http"
2017-11-11 04:44:02 +00:00
"github.com/rancher/norman/types"
"github.com/rancher/norman/types/convert"
2017-11-29 21:27:02 +00:00
"github.com/rancher/norman/types/values"
2017-11-11 04:44:02 +00:00
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2017-11-28 21:28:25 +00:00
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer/json"
"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
2017-11-11 04:44:02 +00:00
"k8s.io/client-go/rest"
2017-11-28 21:28:25 +00:00
restclientwatch "k8s.io/client-go/rest/watch"
2017-11-11 04:44:02 +00:00
)
2017-12-16 20:49:21 +00:00
var (
authHeaders = []string{
"Impersonate-User",
"Impersonate-Group",
}
)
2017-11-11 04:44:02 +00:00
type Store struct {
2017-11-21 20:46:30 +00:00
k8sClient rest.Interface
2017-11-11 04:44:02 +00:00
prefix []string
group string
version string
kind string
resourcePlural string
2017-12-18 20:56:50 +00:00
authContext map[string]string
2017-11-11 04:44:02 +00:00
}
2017-11-21 20:46:30 +00:00
func NewProxyStore(k8sClient rest.Interface,
2017-11-11 04:44:02 +00:00
prefix []string, group, version, kind, resourcePlural string) *Store {
return &Store{
k8sClient: k8sClient,
prefix: prefix,
group: group,
version: version,
kind: kind,
resourcePlural: resourcePlural,
2017-12-18 20:56:50 +00:00
authContext: map[string]string{
"apiGroup": group,
"resource": resourcePlural,
},
2017-11-11 04:44:02 +00:00
}
}
2017-12-16 20:49:21 +00:00
func (p *Store) doAuthed(apiContext *types.APIContext, request *rest.Request) rest.Result {
for _, header := range authHeaders {
2017-12-18 20:56:50 +00:00
request.SetHeader(header, apiContext.Request.Header[http.CanonicalHeaderKey(header)]...)
2017-12-16 20:49:21 +00:00
}
return request.Do()
}
2017-11-11 04:44:02 +00:00
func (p *Store) ByID(apiContext *types.APIContext, schema *types.Schema, id string) (map[string]interface{}, error) {
2017-11-29 21:27:02 +00:00
_, result, err := p.byID(apiContext, schema, id)
return result, err
}
func (p *Store) byID(apiContext *types.APIContext, schema *types.Schema, id string) (string, map[string]interface{}, error) {
2017-11-11 04:44:02 +00:00
namespace, id := splitID(id)
req := p.common(namespace, p.k8sClient.Get()).
Name(id)
2017-12-16 20:49:21 +00:00
return p.singleResult(apiContext, schema, req)
2017-11-11 04:44:02 +00:00
}
2017-11-21 20:46:30 +00:00
func (p *Store) List(apiContext *types.APIContext, schema *types.Schema, opt types.QueryOptions) ([]map[string]interface{}, error) {
2017-11-11 04:44:02 +00:00
namespace := getNamespace(apiContext, opt)
req := p.common(namespace, p.k8sClient.Get())
resultList := &unstructured.UnstructuredList{}
err := req.Do().Into(resultList)
if err != nil {
return nil, err
}
2017-11-21 20:46:30 +00:00
var result []map[string]interface{}
2017-11-11 04:44:02 +00:00
for _, obj := range resultList.Items {
result = append(result, p.fromInternal(schema, obj.Object))
}
2017-12-18 20:56:50 +00:00
return apiContext.AccessControl.FilterList(apiContext, result, p.authContext), nil
2017-11-11 04:44:02 +00:00
}
2017-11-28 21:28:25 +00:00
func (p *Store) Watch(apiContext *types.APIContext, schema *types.Schema, opt types.QueryOptions) (chan map[string]interface{}, error) {
namespace := getNamespace(apiContext, opt)
req := p.common(namespace, p.k8sClient.Get())
req.VersionedParams(&metav1.ListOptions{
Watch: true,
}, dynamic.VersionedParameterEncoderWithV1Fallback)
body, err := req.Stream()
if err != nil {
return nil, err
}
framer := json.Framer.NewFrameReader(body)
decoder := streaming.NewDecoder(framer, &unstructuredDecoder{})
watcher := watch.NewStreamWatcher(restclientwatch.NewDecoder(decoder, &unstructuredDecoder{}))
go func() {
<-apiContext.Request.Context().Done()
watcher.Stop()
}()
result := make(chan map[string]interface{})
go func() {
for event := range watcher.ResultChan() {
data := event.Object.(*unstructured.Unstructured)
p.fromInternal(schema, data.Object)
2017-12-18 20:56:50 +00:00
result <- apiContext.AccessControl.Filter(apiContext, data.Object, p.authContext)
2017-11-28 21:28:25 +00:00
}
close(result)
}()
return result, nil
}
type unstructuredDecoder struct {
}
func (d *unstructuredDecoder) Decode(data []byte, defaults *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
if into == nil {
into = &unstructured.Unstructured{}
}
return into, defaults, ejson.Unmarshal(data, &into)
}
2017-11-21 20:46:30 +00:00
func getNamespace(apiContext *types.APIContext, opt types.QueryOptions) string {
2017-11-28 21:28:25 +00:00
if val, ok := apiContext.SubContext["namespaces"]; ok {
2017-11-11 04:44:02 +00:00
return convert.ToString(val)
}
for _, condition := range opt.Conditions {
2017-11-28 21:28:25 +00:00
if condition.Field == "namespaceId" && condition.Value != "" {
2017-11-21 20:46:30 +00:00
return condition.Value
2017-11-11 04:44:02 +00:00
}
}
return ""
}
func (p *Store) Create(apiContext *types.APIContext, schema *types.Schema, data map[string]interface{}) (map[string]interface{}, error) {
2017-11-28 21:28:25 +00:00
namespace, _ := data["namespaceId"].(string)
2017-11-11 04:44:02 +00:00
p.toInternal(schema.Mapper, data)
2017-11-29 21:27:02 +00:00
name, _ := values.GetValueN(data, "metadata", "name").(string)
2017-11-21 20:46:30 +00:00
if name == "" {
2017-11-29 21:27:02 +00:00
generated, _ := values.GetValueN(data, "metadata", "generateName").(string)
2017-11-21 20:46:30 +00:00
if generated == "" {
2017-11-29 21:27:02 +00:00
values.PutValue(data, strings.ToLower(schema.ID+"-"), "metadata", "generateName")
2017-11-21 20:46:30 +00:00
}
}
2017-11-11 04:44:02 +00:00
req := p.common(namespace, p.k8sClient.Post()).
Body(&unstructured.Unstructured{
Object: data,
})
2017-12-16 20:49:21 +00:00
_, result, err := p.singleResult(apiContext, schema, req)
2017-11-29 21:27:02 +00:00
return result, err
2017-11-11 04:44:02 +00:00
}
func (p *Store) toInternal(mapper types.Mapper, data map[string]interface{}) {
if mapper != nil {
mapper.ToInternal(data)
}
if p.group == "" {
data["apiVersion"] = p.version
} else {
data["apiVersion"] = p.group + "/" + p.version
}
data["kind"] = p.kind
}
func (p *Store) Update(apiContext *types.APIContext, schema *types.Schema, data map[string]interface{}, id string) (map[string]interface{}, error) {
2017-11-29 21:27:02 +00:00
resourceVersion, existing, err := p.byID(apiContext, schema, id)
2017-11-11 04:44:02 +00:00
if err != nil {
return data, nil
}
for k, v := range data {
existing[k] = v
}
p.toInternal(schema.Mapper, existing)
namespace, id := splitID(id)
2017-11-29 21:27:02 +00:00
values.PutValue(existing, resourceVersion, "metadata", "resourceVersion")
2017-11-11 04:44:02 +00:00
req := p.common(namespace, p.k8sClient.Put()).
Body(&unstructured.Unstructured{
Object: existing,
}).
Name(id)
2017-12-16 20:49:21 +00:00
_, result, err := p.singleResult(apiContext, schema, req)
2017-11-29 21:27:02 +00:00
return result, err
2017-11-11 04:44:02 +00:00
}
func (p *Store) Delete(apiContext *types.APIContext, schema *types.Schema, id string) (map[string]interface{}, error) {
2017-11-11 04:44:02 +00:00
namespace, id := splitID(id)
prop := metav1.DeletePropagationForeground
req := p.common(namespace, p.k8sClient.Delete()).
Body(&metav1.DeleteOptions{
PropagationPolicy: &prop,
}).
Name(id)
2017-12-16 20:49:21 +00:00
err := p.doAuthed(apiContext, req).Error()
if err != nil {
return nil, err
}
obj, err := p.ByID(apiContext, schema, id)
if err != nil {
return nil, nil
}
return obj, nil
2017-11-11 04:44:02 +00:00
}
2017-12-16 20:49:21 +00:00
func (p *Store) singleResult(apiContext *types.APIContext, schema *types.Schema, req *rest.Request) (string, map[string]interface{}, error) {
2017-11-11 04:44:02 +00:00
result := &unstructured.Unstructured{}
2017-12-16 20:49:21 +00:00
err := p.doAuthed(apiContext, req).Into(result)
2017-11-11 04:44:02 +00:00
if err != nil {
2017-11-29 21:27:02 +00:00
return "", nil, err
2017-11-11 04:44:02 +00:00
}
2017-11-29 21:27:02 +00:00
version := result.GetResourceVersion()
2017-11-11 04:44:02 +00:00
p.fromInternal(schema, result.Object)
2017-11-29 21:27:02 +00:00
return version, result.Object, nil
2017-11-11 04:44:02 +00:00
}
func splitID(id string) (string, string) {
namespace := ""
parts := strings.SplitN(id, ":", 2)
if len(parts) == 2 {
namespace = parts[0]
id = parts[1]
}
return namespace, id
}
func (p *Store) common(namespace string, req *rest.Request) *rest.Request {
prefix := append([]string{}, p.prefix...)
if p.group != "" {
prefix = append(prefix, p.group)
}
prefix = append(prefix, p.version)
req.Prefix(prefix...).
Resource(p.resourcePlural)
if namespace != "" {
req.Namespace(namespace)
}
return req
}
func (p *Store) fromInternal(schema *types.Schema, data map[string]interface{}) map[string]interface{} {
if schema.Mapper != nil {
schema.Mapper.FromInternal(data)
}
return data
}