mirror of
https://github.com/niusmallnan/steve.git
synced 2025-09-17 23:38:37 +00:00
Initial implmentation of warning headers
Attempts to pass through warning headers which k8s returns. Requires an update to rancher/apiserver.
This commit is contained in:
committed by
Chad Roberts
parent
7565dba268
commit
956b7351aa
@@ -5,6 +5,8 @@ import (
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/rancher/apiserver/pkg/types"
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
"golang.org/x/sync/semaphore"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
@@ -33,7 +35,7 @@ type ParallelPartitionLister struct {
|
||||
}
|
||||
|
||||
// PartitionLister lists objects for one partition.
|
||||
type PartitionLister func(ctx context.Context, partition Partition, cont string, revision string, limit int) (*unstructured.UnstructuredList, error)
|
||||
type PartitionLister func(ctx context.Context, partition Partition, cont string, revision string, limit int) (*unstructured.UnstructuredList, []types.Warning, error)
|
||||
|
||||
// Err returns the latest error encountered.
|
||||
func (p *ParallelPartitionLister) Err() error {
|
||||
@@ -174,7 +176,7 @@ func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, l
|
||||
if partition.Name() == state.PartitionName {
|
||||
cont = state.Continue
|
||||
}
|
||||
list, err := p.Lister(ctx, partition, cont, state.Revision, limit)
|
||||
list, _, err := p.Lister(ctx, partition, cont, state.Revision, limit)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@@ -79,11 +79,11 @@ type cacheKey struct {
|
||||
|
||||
// UnstructuredStore is like types.Store but deals in k8s unstructured objects instead of apiserver types.
|
||||
type UnstructuredStore interface {
|
||||
ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, error)
|
||||
List(apiOp *types.APIRequest, schema *types.APISchema) (*unstructured.UnstructuredList, error)
|
||||
Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (*unstructured.Unstructured, error)
|
||||
Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (*unstructured.Unstructured, error)
|
||||
Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, error)
|
||||
ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, []types.Warning, error)
|
||||
List(apiOp *types.APIRequest, schema *types.APISchema) (*unstructured.UnstructuredList, []types.Warning, error)
|
||||
Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (*unstructured.Unstructured, []types.Warning, error)
|
||||
Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (*unstructured.Unstructured, []types.Warning, error)
|
||||
Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, []types.Warning, error)
|
||||
Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan watch.Event, error)
|
||||
}
|
||||
|
||||
@@ -103,11 +103,11 @@ func (s *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id stri
|
||||
return types.APIObject{}, err
|
||||
}
|
||||
|
||||
obj, err := target.Delete(apiOp, schema, id)
|
||||
obj, warnings, err := target.Delete(apiOp, schema, id)
|
||||
if err != nil {
|
||||
return types.APIObject{}, err
|
||||
}
|
||||
return toAPI(schema, obj), nil
|
||||
return toAPI(schema, obj, warnings), nil
|
||||
}
|
||||
|
||||
// ByID looks up a single object by its ID.
|
||||
@@ -117,18 +117,18 @@ func (s *Store) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string
|
||||
return types.APIObject{}, err
|
||||
}
|
||||
|
||||
obj, err := target.ByID(apiOp, schema, id)
|
||||
obj, warnings, err := target.ByID(apiOp, schema, id)
|
||||
if err != nil {
|
||||
return types.APIObject{}, err
|
||||
}
|
||||
return toAPI(schema, obj), nil
|
||||
return toAPI(schema, obj, warnings), nil
|
||||
}
|
||||
|
||||
func (s *Store) listPartition(ctx context.Context, apiOp *types.APIRequest, schema *types.APISchema, partition Partition,
|
||||
cont string, revision string, limit int) (*unstructured.UnstructuredList, error) {
|
||||
cont string, revision string, limit int) (*unstructured.UnstructuredList, []types.Warning, error) {
|
||||
store, err := s.Partitioner.Store(apiOp, partition)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
req := apiOp.Clone()
|
||||
@@ -163,7 +163,7 @@ func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.AP
|
||||
}
|
||||
|
||||
lister := ParallelPartitionLister{
|
||||
Lister: func(ctx context.Context, partition Partition, cont string, revision string, limit int) (*unstructured.UnstructuredList, error) {
|
||||
Lister: func(ctx context.Context, partition Partition, cont string, revision string, limit int) (*unstructured.UnstructuredList, []types.Warning, error) {
|
||||
return s.listPartition(ctx, apiOp, schema, partition, cont, revision, limit)
|
||||
},
|
||||
Concurrency: 3,
|
||||
@@ -217,7 +217,7 @@ func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.AP
|
||||
|
||||
for _, item := range list {
|
||||
item := item
|
||||
result.Objects = append(result.Objects, toAPI(schema, &item))
|
||||
result.Objects = append(result.Objects, toAPI(schema, &item, nil))
|
||||
}
|
||||
|
||||
result.Revision = key.revision
|
||||
@@ -254,11 +254,11 @@ func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, data ty
|
||||
return types.APIObject{}, err
|
||||
}
|
||||
|
||||
obj, err := target.Create(apiOp, schema, data)
|
||||
obj, warnings, err := target.Create(apiOp, schema, data)
|
||||
if err != nil {
|
||||
return types.APIObject{}, err
|
||||
}
|
||||
return toAPI(schema, obj), nil
|
||||
return toAPI(schema, obj, warnings), nil
|
||||
}
|
||||
|
||||
// Update updates a single object in the store.
|
||||
@@ -268,11 +268,11 @@ func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, data ty
|
||||
return types.APIObject{}, err
|
||||
}
|
||||
|
||||
obj, err := target.Update(apiOp, schema, data, id)
|
||||
obj, warnings, err := target.Update(apiOp, schema, data, id)
|
||||
if err != nil {
|
||||
return types.APIObject{}, err
|
||||
}
|
||||
return toAPI(schema, obj), nil
|
||||
return toAPI(schema, obj, warnings), nil
|
||||
}
|
||||
|
||||
// Watch returns a channel of events for a list or resource.
|
||||
@@ -318,7 +318,7 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types
|
||||
return response, nil
|
||||
}
|
||||
|
||||
func toAPI(schema *types.APISchema, obj runtime.Object) types.APIObject {
|
||||
func toAPI(schema *types.APISchema, obj runtime.Object, warnings []types.Warning) types.APIObject {
|
||||
if obj == nil || reflect.ValueOf(obj).IsNil() {
|
||||
return types.APIObject{}
|
||||
}
|
||||
@@ -344,6 +344,7 @@ func toAPI(schema *types.APISchema, obj runtime.Object) types.APIObject {
|
||||
}
|
||||
|
||||
apiObject.ID = id
|
||||
apiObject.Warnings = warnings
|
||||
return apiObject
|
||||
}
|
||||
|
||||
@@ -384,7 +385,7 @@ func toAPIEvent(apiOp *types.APIRequest, schema *types.APISchema, event watch.Ev
|
||||
return apiEvent
|
||||
}
|
||||
|
||||
apiEvent.Object = toAPI(schema, event.Object)
|
||||
apiEvent.Object = toAPI(schema, event.Object, nil)
|
||||
|
||||
m, err := meta.Accessor(event.Object)
|
||||
if err != nil {
|
||||
|
@@ -1881,12 +1881,12 @@ type mockStore struct {
|
||||
called int
|
||||
}
|
||||
|
||||
func (m *mockStore) List(apiOp *types.APIRequest, schema *types.APISchema) (*unstructured.UnstructuredList, error) {
|
||||
func (m *mockStore) List(apiOp *types.APIRequest, schema *types.APISchema) (*unstructured.UnstructuredList, []types.Warning, error) {
|
||||
m.called++
|
||||
query, _ := url.ParseQuery(apiOp.Request.URL.RawQuery)
|
||||
l := query.Get("limit")
|
||||
if l == "" {
|
||||
return m.contents, nil
|
||||
return m.contents, nil, nil
|
||||
}
|
||||
i := 0
|
||||
if c := query.Get("continue"); c != "" {
|
||||
@@ -1904,29 +1904,29 @@ func (m *mockStore) List(apiOp *types.APIRequest, schema *types.APISchema) (*uns
|
||||
contents.SetContinue(base64.StdEncoding.EncodeToString([]byte(contents.Items[i+lInt].GetName())))
|
||||
}
|
||||
if i > len(contents.Items) {
|
||||
return contents, nil
|
||||
return contents, nil, nil
|
||||
}
|
||||
if i+lInt > len(contents.Items) {
|
||||
contents.Items = contents.Items[i:]
|
||||
return contents, nil
|
||||
return contents, nil, nil
|
||||
}
|
||||
contents.Items = contents.Items[i : i+lInt]
|
||||
return contents, nil
|
||||
return contents, nil, nil
|
||||
}
|
||||
|
||||
func (m *mockStore) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, error) {
|
||||
func (m *mockStore) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, []types.Warning, error) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (m *mockStore) Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (*unstructured.Unstructured, error) {
|
||||
func (m *mockStore) Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (*unstructured.Unstructured, []types.Warning, error) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (m *mockStore) Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (*unstructured.Unstructured, error) {
|
||||
func (m *mockStore) Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (*unstructured.Unstructured, []types.Warning, error) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (m *mockStore) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, error) {
|
||||
func (m *mockStore) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, []types.Warning, error) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
@@ -1939,7 +1939,7 @@ type mockVersionedStore struct {
|
||||
versions []mockStore
|
||||
}
|
||||
|
||||
func (m *mockVersionedStore) List(apiOp *types.APIRequest, schema *types.APISchema) (*unstructured.UnstructuredList, error) {
|
||||
func (m *mockVersionedStore) List(apiOp *types.APIRequest, schema *types.APISchema) (*unstructured.UnstructuredList, []types.Warning, error) {
|
||||
m.called++
|
||||
query, _ := url.ParseQuery(apiOp.Request.URL.RawQuery)
|
||||
rv := len(m.versions) - 1
|
||||
@@ -1949,7 +1949,7 @@ func (m *mockVersionedStore) List(apiOp *types.APIRequest, schema *types.APISche
|
||||
}
|
||||
l := query.Get("limit")
|
||||
if l == "" {
|
||||
return m.versions[rv].contents, nil
|
||||
return m.versions[rv].contents, nil, nil
|
||||
}
|
||||
i := 0
|
||||
if c := query.Get("continue"); c != "" {
|
||||
@@ -1967,14 +1967,14 @@ func (m *mockVersionedStore) List(apiOp *types.APIRequest, schema *types.APISche
|
||||
contents.SetContinue(base64.StdEncoding.EncodeToString([]byte(contents.Items[i+lInt].GetName())))
|
||||
}
|
||||
if i > len(contents.Items) {
|
||||
return contents, nil
|
||||
return contents, nil, nil
|
||||
}
|
||||
if i+lInt > len(contents.Items) {
|
||||
contents.Items = contents.Items[i:]
|
||||
return contents, nil
|
||||
return contents, nil, nil
|
||||
}
|
||||
contents.Items = contents.Items[i : i+lInt]
|
||||
return contents, nil
|
||||
return contents, nil, nil
|
||||
}
|
||||
|
||||
type mockCache struct {
|
||||
|
Reference in New Issue
Block a user