mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 15:25:57 +00:00
add self linking to apiserver
This commit is contained in:
parent
b972f72248
commit
37e505601e
@ -45,11 +45,11 @@ const (
|
|||||||
StatusUnprocessableEntity = 422
|
StatusUnprocessableEntity = 422
|
||||||
)
|
)
|
||||||
|
|
||||||
// Handle returns a Handler function that expose the provided storage interfaces
|
// Handle returns a Handler function that exposes the provided storage interfaces
|
||||||
// as RESTful resources at prefix, serialized by codec, and also includes the support
|
// as RESTful resources at prefix, serialized by codec, and also includes the support
|
||||||
// http resources.
|
// http resources.
|
||||||
func Handle(storage map[string]RESTStorage, codec runtime.Codec, prefix string) http.Handler {
|
func Handle(storage map[string]RESTStorage, codec runtime.Codec, prefix string, selfLinker runtime.SelfLinker) http.Handler {
|
||||||
group := NewAPIGroup(storage, codec)
|
group := NewAPIGroup(storage, codec, prefix, selfLinker)
|
||||||
|
|
||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
group.InstallREST(mux, prefix)
|
group.InstallREST(mux, prefix)
|
||||||
@ -72,11 +72,13 @@ type APIGroup struct {
|
|||||||
// This is a helper method for registering multiple sets of REST handlers under different
|
// This is a helper method for registering multiple sets of REST handlers under different
|
||||||
// prefixes onto a server.
|
// prefixes onto a server.
|
||||||
// TODO: add multitype codec serialization
|
// TODO: add multitype codec serialization
|
||||||
func NewAPIGroup(storage map[string]RESTStorage, codec runtime.Codec) *APIGroup {
|
func NewAPIGroup(storage map[string]RESTStorage, codec runtime.Codec, canonicalPrefix string, selfLinker runtime.SelfLinker) *APIGroup {
|
||||||
return &APIGroup{RESTHandler{
|
return &APIGroup{RESTHandler{
|
||||||
storage: storage,
|
storage: storage,
|
||||||
codec: codec,
|
codec: codec,
|
||||||
ops: NewOperations(),
|
canonicalPrefix: canonicalPrefix,
|
||||||
|
selfLinker: selfLinker,
|
||||||
|
ops: NewOperations(),
|
||||||
// Delay just long enough to handle most simple write operations
|
// Delay just long enough to handle most simple write operations
|
||||||
asyncOpWait: time.Millisecond * 25,
|
asyncOpWait: time.Millisecond * 25,
|
||||||
}}
|
}}
|
||||||
|
@ -45,6 +45,7 @@ func convert(obj runtime.Object) (runtime.Object, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var codec = latest.Codec
|
var codec = latest.Codec
|
||||||
|
var selfLinker = latest.SelfLinker
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
api.Scheme.AddKnownTypes("", &Simple{}, &SimpleList{})
|
api.Scheme.AddKnownTypes("", &Simple{}, &SimpleList{})
|
||||||
@ -193,7 +194,7 @@ func TestNotFound(t *testing.T) {
|
|||||||
}
|
}
|
||||||
handler := Handle(map[string]RESTStorage{
|
handler := Handle(map[string]RESTStorage{
|
||||||
"foo": &SimpleRESTStorage{},
|
"foo": &SimpleRESTStorage{},
|
||||||
}, codec, "/prefix/version")
|
}, codec, "/prefix/version", selfLinker)
|
||||||
server := httptest.NewServer(handler)
|
server := httptest.NewServer(handler)
|
||||||
client := http.Client{}
|
client := http.Client{}
|
||||||
for k, v := range cases {
|
for k, v := range cases {
|
||||||
@ -214,7 +215,7 @@ func TestNotFound(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestVersion(t *testing.T) {
|
func TestVersion(t *testing.T) {
|
||||||
handler := Handle(map[string]RESTStorage{}, codec, "/prefix/version")
|
handler := Handle(map[string]RESTStorage{}, codec, "/prefix/version", selfLinker)
|
||||||
server := httptest.NewServer(handler)
|
server := httptest.NewServer(handler)
|
||||||
client := http.Client{}
|
client := http.Client{}
|
||||||
|
|
||||||
@ -243,7 +244,11 @@ func TestSimpleList(t *testing.T) {
|
|||||||
storage := map[string]RESTStorage{}
|
storage := map[string]RESTStorage{}
|
||||||
simpleStorage := SimpleRESTStorage{}
|
simpleStorage := SimpleRESTStorage{}
|
||||||
storage["simple"] = &simpleStorage
|
storage["simple"] = &simpleStorage
|
||||||
handler := Handle(storage, codec, "/prefix/version")
|
selfLinker := &setTestSelfLinker{
|
||||||
|
t: t,
|
||||||
|
expectedSet: "/prefix/version/simple",
|
||||||
|
}
|
||||||
|
handler := Handle(storage, codec, "/prefix/version", selfLinker)
|
||||||
server := httptest.NewServer(handler)
|
server := httptest.NewServer(handler)
|
||||||
|
|
||||||
resp, err := http.Get(server.URL + "/prefix/version/simple")
|
resp, err := http.Get(server.URL + "/prefix/version/simple")
|
||||||
@ -254,6 +259,9 @@ func TestSimpleList(t *testing.T) {
|
|||||||
if resp.StatusCode != http.StatusOK {
|
if resp.StatusCode != http.StatusOK {
|
||||||
t.Errorf("Unexpected status: %d, Expected: %d, %#v", resp.StatusCode, http.StatusOK, resp)
|
t.Errorf("Unexpected status: %d, Expected: %d, %#v", resp.StatusCode, http.StatusOK, resp)
|
||||||
}
|
}
|
||||||
|
if !selfLinker.called {
|
||||||
|
t.Errorf("Never set self link")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestErrorList(t *testing.T) {
|
func TestErrorList(t *testing.T) {
|
||||||
@ -262,7 +270,7 @@ func TestErrorList(t *testing.T) {
|
|||||||
errors: map[string]error{"list": fmt.Errorf("test Error")},
|
errors: map[string]error{"list": fmt.Errorf("test Error")},
|
||||||
}
|
}
|
||||||
storage["simple"] = &simpleStorage
|
storage["simple"] = &simpleStorage
|
||||||
handler := Handle(storage, codec, "/prefix/version")
|
handler := Handle(storage, codec, "/prefix/version", selfLinker)
|
||||||
server := httptest.NewServer(handler)
|
server := httptest.NewServer(handler)
|
||||||
|
|
||||||
resp, err := http.Get(server.URL + "/prefix/version/simple")
|
resp, err := http.Get(server.URL + "/prefix/version/simple")
|
||||||
@ -286,7 +294,7 @@ func TestNonEmptyList(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
storage["simple"] = &simpleStorage
|
storage["simple"] = &simpleStorage
|
||||||
handler := Handle(storage, codec, "/prefix/version")
|
handler := Handle(storage, codec, "/prefix/version", selfLinker)
|
||||||
server := httptest.NewServer(handler)
|
server := httptest.NewServer(handler)
|
||||||
|
|
||||||
resp, err := http.Get(server.URL + "/prefix/version/simple")
|
resp, err := http.Get(server.URL + "/prefix/version/simple")
|
||||||
@ -320,8 +328,12 @@ func TestGet(t *testing.T) {
|
|||||||
Name: "foo",
|
Name: "foo",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
selfLinker := &setTestSelfLinker{
|
||||||
|
t: t,
|
||||||
|
expectedSet: "/prefix/version/simple/id",
|
||||||
|
}
|
||||||
storage["simple"] = &simpleStorage
|
storage["simple"] = &simpleStorage
|
||||||
handler := Handle(storage, codec, "/prefix/version")
|
handler := Handle(storage, codec, "/prefix/version", selfLinker)
|
||||||
server := httptest.NewServer(handler)
|
server := httptest.NewServer(handler)
|
||||||
|
|
||||||
resp, err := http.Get(server.URL + "/prefix/version/simple/id")
|
resp, err := http.Get(server.URL + "/prefix/version/simple/id")
|
||||||
@ -334,6 +346,9 @@ func TestGet(t *testing.T) {
|
|||||||
if itemOut.Name != simpleStorage.item.Name {
|
if itemOut.Name != simpleStorage.item.Name {
|
||||||
t.Errorf("Unexpected data: %#v, expected %#v (%s)", itemOut, simpleStorage.item, string(body))
|
t.Errorf("Unexpected data: %#v, expected %#v (%s)", itemOut, simpleStorage.item, string(body))
|
||||||
}
|
}
|
||||||
|
if !selfLinker.called {
|
||||||
|
t.Errorf("Never set self link")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGetMissing(t *testing.T) {
|
func TestGetMissing(t *testing.T) {
|
||||||
@ -342,7 +357,7 @@ func TestGetMissing(t *testing.T) {
|
|||||||
errors: map[string]error{"get": apierrs.NewNotFound("simple", "id")},
|
errors: map[string]error{"get": apierrs.NewNotFound("simple", "id")},
|
||||||
}
|
}
|
||||||
storage["simple"] = &simpleStorage
|
storage["simple"] = &simpleStorage
|
||||||
handler := Handle(storage, codec, "/prefix/version")
|
handler := Handle(storage, codec, "/prefix/version", selfLinker)
|
||||||
server := httptest.NewServer(handler)
|
server := httptest.NewServer(handler)
|
||||||
|
|
||||||
resp, err := http.Get(server.URL + "/prefix/version/simple/id")
|
resp, err := http.Get(server.URL + "/prefix/version/simple/id")
|
||||||
@ -360,7 +375,7 @@ func TestDelete(t *testing.T) {
|
|||||||
simpleStorage := SimpleRESTStorage{}
|
simpleStorage := SimpleRESTStorage{}
|
||||||
ID := "id"
|
ID := "id"
|
||||||
storage["simple"] = &simpleStorage
|
storage["simple"] = &simpleStorage
|
||||||
handler := Handle(storage, codec, "/prefix/version")
|
handler := Handle(storage, codec, "/prefix/version", selfLinker)
|
||||||
server := httptest.NewServer(handler)
|
server := httptest.NewServer(handler)
|
||||||
|
|
||||||
client := http.Client{}
|
client := http.Client{}
|
||||||
@ -382,7 +397,7 @@ func TestDeleteMissing(t *testing.T) {
|
|||||||
errors: map[string]error{"delete": apierrs.NewNotFound("simple", ID)},
|
errors: map[string]error{"delete": apierrs.NewNotFound("simple", ID)},
|
||||||
}
|
}
|
||||||
storage["simple"] = &simpleStorage
|
storage["simple"] = &simpleStorage
|
||||||
handler := Handle(storage, codec, "/prefix/version")
|
handler := Handle(storage, codec, "/prefix/version", selfLinker)
|
||||||
server := httptest.NewServer(handler)
|
server := httptest.NewServer(handler)
|
||||||
|
|
||||||
client := http.Client{}
|
client := http.Client{}
|
||||||
@ -402,7 +417,11 @@ func TestUpdate(t *testing.T) {
|
|||||||
simpleStorage := SimpleRESTStorage{}
|
simpleStorage := SimpleRESTStorage{}
|
||||||
ID := "id"
|
ID := "id"
|
||||||
storage["simple"] = &simpleStorage
|
storage["simple"] = &simpleStorage
|
||||||
handler := Handle(storage, codec, "/prefix/version")
|
selfLinker := &setTestSelfLinker{
|
||||||
|
t: t,
|
||||||
|
expectedSet: "/prefix/version/simple/" + ID,
|
||||||
|
}
|
||||||
|
handler := Handle(storage, codec, "/prefix/version", selfLinker)
|
||||||
server := httptest.NewServer(handler)
|
server := httptest.NewServer(handler)
|
||||||
|
|
||||||
item := &Simple{
|
item := &Simple{
|
||||||
@ -423,6 +442,9 @@ func TestUpdate(t *testing.T) {
|
|||||||
if simpleStorage.updated.Name != item.Name {
|
if simpleStorage.updated.Name != item.Name {
|
||||||
t.Errorf("Unexpected update value %#v, expected %#v.", simpleStorage.updated, item)
|
t.Errorf("Unexpected update value %#v, expected %#v.", simpleStorage.updated, item)
|
||||||
}
|
}
|
||||||
|
if !selfLinker.called {
|
||||||
|
t.Errorf("Never set self link")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestUpdateMissing(t *testing.T) {
|
func TestUpdateMissing(t *testing.T) {
|
||||||
@ -432,7 +454,7 @@ func TestUpdateMissing(t *testing.T) {
|
|||||||
errors: map[string]error{"update": apierrs.NewNotFound("simple", ID)},
|
errors: map[string]error{"update": apierrs.NewNotFound("simple", ID)},
|
||||||
}
|
}
|
||||||
storage["simple"] = &simpleStorage
|
storage["simple"] = &simpleStorage
|
||||||
handler := Handle(storage, codec, "/prefix/version")
|
handler := Handle(storage, codec, "/prefix/version", selfLinker)
|
||||||
server := httptest.NewServer(handler)
|
server := httptest.NewServer(handler)
|
||||||
|
|
||||||
item := &Simple{
|
item := &Simple{
|
||||||
@ -459,7 +481,7 @@ func TestCreate(t *testing.T) {
|
|||||||
simpleStorage := &SimpleRESTStorage{}
|
simpleStorage := &SimpleRESTStorage{}
|
||||||
handler := Handle(map[string]RESTStorage{
|
handler := Handle(map[string]RESTStorage{
|
||||||
"foo": simpleStorage,
|
"foo": simpleStorage,
|
||||||
}, codec, "/prefix/version")
|
}, codec, "/prefix/version", selfLinker)
|
||||||
handler.(*defaultAPIServer).group.handler.asyncOpWait = 0
|
handler.(*defaultAPIServer).group.handler.asyncOpWait = 0
|
||||||
server := httptest.NewServer(handler)
|
server := httptest.NewServer(handler)
|
||||||
client := http.Client{}
|
client := http.Client{}
|
||||||
@ -500,7 +522,7 @@ func TestCreateNotFound(t *testing.T) {
|
|||||||
// See https://github.com/GoogleCloudPlatform/kubernetes/pull/486#discussion_r15037092.
|
// See https://github.com/GoogleCloudPlatform/kubernetes/pull/486#discussion_r15037092.
|
||||||
errors: map[string]error{"create": apierrs.NewNotFound("simple", "id")},
|
errors: map[string]error{"create": apierrs.NewNotFound("simple", "id")},
|
||||||
},
|
},
|
||||||
}, codec, "/prefix/version")
|
}, codec, "/prefix/version", selfLinker)
|
||||||
server := httptest.NewServer(handler)
|
server := httptest.NewServer(handler)
|
||||||
client := http.Client{}
|
client := http.Client{}
|
||||||
|
|
||||||
@ -533,6 +555,23 @@ func TestParseTimeout(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type setTestSelfLinker struct {
|
||||||
|
t *testing.T
|
||||||
|
expectedSet string
|
||||||
|
id string
|
||||||
|
called bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *setTestSelfLinker) ID(runtime.Object) (string, error) { return s.id, nil }
|
||||||
|
func (*setTestSelfLinker) SelfLink(runtime.Object) (string, error) { return "", nil }
|
||||||
|
func (s *setTestSelfLinker) SetSelfLink(obj runtime.Object, selfLink string) error {
|
||||||
|
if e, a := s.expectedSet, selfLink; e != a {
|
||||||
|
s.t.Errorf("expected '%v', got '%v'", e, a)
|
||||||
|
}
|
||||||
|
s.called = true
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func TestSyncCreate(t *testing.T) {
|
func TestSyncCreate(t *testing.T) {
|
||||||
storage := SimpleRESTStorage{
|
storage := SimpleRESTStorage{
|
||||||
injectedFunction: func(obj runtime.Object) (runtime.Object, error) {
|
injectedFunction: func(obj runtime.Object) (runtime.Object, error) {
|
||||||
@ -540,14 +579,19 @@ func TestSyncCreate(t *testing.T) {
|
|||||||
return obj, nil
|
return obj, nil
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
selfLinker := &setTestSelfLinker{
|
||||||
|
t: t,
|
||||||
|
id: "bar",
|
||||||
|
expectedSet: "/prefix/version/foo/bar",
|
||||||
|
}
|
||||||
handler := Handle(map[string]RESTStorage{
|
handler := Handle(map[string]RESTStorage{
|
||||||
"foo": &storage,
|
"foo": &storage,
|
||||||
}, codec, "/prefix/version")
|
}, codec, "/prefix/version", selfLinker)
|
||||||
server := httptest.NewServer(handler)
|
server := httptest.NewServer(handler)
|
||||||
client := http.Client{}
|
client := http.Client{}
|
||||||
|
|
||||||
simple := &Simple{
|
simple := &Simple{
|
||||||
Name: "foo",
|
Name: "bar",
|
||||||
}
|
}
|
||||||
data, _ := codec.Encode(simple)
|
data, _ := codec.Encode(simple)
|
||||||
request, err := http.NewRequest("POST", server.URL+"/prefix/version/foo?sync=true", bytes.NewBuffer(data))
|
request, err := http.NewRequest("POST", server.URL+"/prefix/version/foo?sync=true", bytes.NewBuffer(data))
|
||||||
@ -579,6 +623,9 @@ func TestSyncCreate(t *testing.T) {
|
|||||||
if response.StatusCode != http.StatusOK {
|
if response.StatusCode != http.StatusOK {
|
||||||
t.Errorf("Unexpected status: %d, Expected: %d, %#v", response.StatusCode, http.StatusOK, response)
|
t.Errorf("Unexpected status: %d, Expected: %d, %#v", response.StatusCode, http.StatusOK, response)
|
||||||
}
|
}
|
||||||
|
if !selfLinker.called {
|
||||||
|
t.Errorf("Never set self link")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func expectApiStatus(t *testing.T, method, url string, data []byte, code int) *api.Status {
|
func expectApiStatus(t *testing.T, method, url string, data []byte, code int) *api.Status {
|
||||||
@ -611,7 +658,7 @@ func TestAsyncDelayReturnsError(t *testing.T) {
|
|||||||
return nil, apierrs.NewAlreadyExists("foo", "bar")
|
return nil, apierrs.NewAlreadyExists("foo", "bar")
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
handler := Handle(map[string]RESTStorage{"foo": &storage}, codec, "/prefix/version")
|
handler := Handle(map[string]RESTStorage{"foo": &storage}, codec, "/prefix/version", selfLinker)
|
||||||
handler.(*defaultAPIServer).group.handler.asyncOpWait = time.Millisecond / 2
|
handler.(*defaultAPIServer).group.handler.asyncOpWait = time.Millisecond / 2
|
||||||
server := httptest.NewServer(handler)
|
server := httptest.NewServer(handler)
|
||||||
|
|
||||||
@ -629,11 +676,16 @@ func TestAsyncCreateError(t *testing.T) {
|
|||||||
return nil, apierrs.NewAlreadyExists("foo", "bar")
|
return nil, apierrs.NewAlreadyExists("foo", "bar")
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
handler := Handle(map[string]RESTStorage{"foo": &storage}, codec, "/prefix/version")
|
selfLinker := &setTestSelfLinker{
|
||||||
|
t: t,
|
||||||
|
id: "bar",
|
||||||
|
expectedSet: "/prefix/version/foo/bar",
|
||||||
|
}
|
||||||
|
handler := Handle(map[string]RESTStorage{"foo": &storage}, codec, "/prefix/version", selfLinker)
|
||||||
handler.(*defaultAPIServer).group.handler.asyncOpWait = 0
|
handler.(*defaultAPIServer).group.handler.asyncOpWait = 0
|
||||||
server := httptest.NewServer(handler)
|
server := httptest.NewServer(handler)
|
||||||
|
|
||||||
simple := &Simple{Name: "foo"}
|
simple := &Simple{Name: "bar"}
|
||||||
data, _ := codec.Encode(simple)
|
data, _ := codec.Encode(simple)
|
||||||
|
|
||||||
status := expectApiStatus(t, "POST", fmt.Sprintf("%s/prefix/version/foo", server.URL), data, http.StatusAccepted)
|
status := expectApiStatus(t, "POST", fmt.Sprintf("%s/prefix/version/foo", server.URL), data, http.StatusAccepted)
|
||||||
@ -667,6 +719,9 @@ func TestAsyncCreateError(t *testing.T) {
|
|||||||
t.Logf("Details %#v, Got %#v", *expectedStatus.Details, *finalStatus.Details)
|
t.Logf("Details %#v, Got %#v", *expectedStatus.Details, *finalStatus.Details)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if !selfLinker.called {
|
||||||
|
t.Errorf("Never set self link")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type UnregisteredAPIObject struct {
|
type UnregisteredAPIObject struct {
|
||||||
@ -723,7 +778,7 @@ func TestSyncCreateTimeout(t *testing.T) {
|
|||||||
}
|
}
|
||||||
handler := Handle(map[string]RESTStorage{
|
handler := Handle(map[string]RESTStorage{
|
||||||
"foo": &storage,
|
"foo": &storage,
|
||||||
}, codec, "/prefix/version")
|
}, codec, "/prefix/version", selfLinker)
|
||||||
server := httptest.NewServer(handler)
|
server := httptest.NewServer(handler)
|
||||||
|
|
||||||
simple := &Simple{Name: "foo"}
|
simple := &Simple{Name: "foo"}
|
||||||
@ -753,7 +808,10 @@ func TestCORSAllowedOrigins(t *testing.T) {
|
|||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
handler := CORS(Handle(map[string]RESTStorage{}, codec, "/prefix/version"), allowedOriginRegexps, nil, nil, "true")
|
handler := CORS(
|
||||||
|
Handle(map[string]RESTStorage{}, codec, "/prefix/version", selfLinker),
|
||||||
|
allowedOriginRegexps, nil, nil, "true",
|
||||||
|
)
|
||||||
server := httptest.NewServer(handler)
|
server := httptest.NewServer(handler)
|
||||||
client := http.Client{}
|
client := http.Client{}
|
||||||
|
|
||||||
|
@ -127,7 +127,7 @@ func TestApiServerMinionProxy(t *testing.T) {
|
|||||||
proxyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
proxyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||||
w.Write([]byte(req.URL.Path))
|
w.Write([]byte(req.URL.Path))
|
||||||
}))
|
}))
|
||||||
server := httptest.NewServer(Handle(nil, nil, "/prefix"))
|
server := httptest.NewServer(Handle(nil, nil, "/prefix", selfLinker))
|
||||||
proxy, _ := url.Parse(proxyServer.URL)
|
proxy, _ := url.Parse(proxyServer.URL)
|
||||||
resp, err := http.Get(fmt.Sprintf("%s/proxy/minion/%s%s", server.URL, proxy.Host, "/test"))
|
resp, err := http.Get(fmt.Sprintf("%s/proxy/minion/%s%s", server.URL, proxy.Host, "/test"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -63,12 +63,13 @@ func (h *OperationHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|||||||
|
|
||||||
// Operation represents an ongoing action which the server is performing.
|
// Operation represents an ongoing action which the server is performing.
|
||||||
type Operation struct {
|
type Operation struct {
|
||||||
ID string
|
ID string
|
||||||
result runtime.Object
|
result runtime.Object
|
||||||
awaiting <-chan runtime.Object
|
onReceive func(runtime.Object)
|
||||||
finished *time.Time
|
awaiting <-chan runtime.Object
|
||||||
lock sync.Mutex
|
finished *time.Time
|
||||||
notify chan struct{}
|
lock sync.Mutex
|
||||||
|
notify chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Operations tracks all the ongoing operations.
|
// Operations tracks all the ongoing operations.
|
||||||
@ -90,13 +91,15 @@ func NewOperations() *Operations {
|
|||||||
return ops
|
return ops
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewOperation adds a new operation. It is lock-free.
|
// NewOperation adds a new operation. It is lock-free. 'onReceive' will be called
|
||||||
func (ops *Operations) NewOperation(from <-chan runtime.Object) *Operation {
|
// with the value read from 'from', when it is read.
|
||||||
|
func (ops *Operations) NewOperation(from <-chan runtime.Object, onReceive func(runtime.Object)) *Operation {
|
||||||
id := atomic.AddInt64(&ops.lastID, 1)
|
id := atomic.AddInt64(&ops.lastID, 1)
|
||||||
op := &Operation{
|
op := &Operation{
|
||||||
ID: strconv.FormatInt(id, 10),
|
ID: strconv.FormatInt(id, 10),
|
||||||
awaiting: from,
|
awaiting: from,
|
||||||
notify: make(chan struct{}),
|
onReceive: onReceive,
|
||||||
|
notify: make(chan struct{}),
|
||||||
}
|
}
|
||||||
go op.wait()
|
go op.wait()
|
||||||
go ops.insert(op)
|
go ops.insert(op)
|
||||||
@ -159,6 +162,9 @@ func (op *Operation) wait() {
|
|||||||
|
|
||||||
op.lock.Lock()
|
op.lock.Lock()
|
||||||
defer op.lock.Unlock()
|
defer op.lock.Unlock()
|
||||||
|
if op.onReceive != nil {
|
||||||
|
op.onReceive(result)
|
||||||
|
}
|
||||||
op.result = result
|
op.result = result
|
||||||
finished := time.Now()
|
finished := time.Now()
|
||||||
op.finished = &finished
|
op.finished = &finished
|
||||||
|
@ -35,7 +35,8 @@ func TestOperation(t *testing.T) {
|
|||||||
ops := NewOperations()
|
ops := NewOperations()
|
||||||
|
|
||||||
c := make(chan runtime.Object)
|
c := make(chan runtime.Object)
|
||||||
op := ops.NewOperation(c)
|
called := make(chan struct{})
|
||||||
|
op := ops.NewOperation(c, func(runtime.Object) { go close(called) })
|
||||||
// Allow context switch, so that op's ID can get added to the map and Get will work.
|
// Allow context switch, so that op's ID can get added to the map and Get will work.
|
||||||
// This is just so we can test Get. Ordinary users have no need to call Get immediately
|
// This is just so we can test Get. Ordinary users have no need to call Get immediately
|
||||||
// after calling NewOperation, because it returns the operation directly.
|
// after calling NewOperation, because it returns the operation directly.
|
||||||
@ -72,6 +73,11 @@ func TestOperation(t *testing.T) {
|
|||||||
t.Errorf("Unexpectedly slow completion")
|
t.Errorf("Unexpectedly slow completion")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_, open := <-called
|
||||||
|
if open {
|
||||||
|
t.Errorf("expected hook to be called!")
|
||||||
|
}
|
||||||
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
finished := atomic.LoadInt32(&waited)
|
finished := atomic.LoadInt32(&waited)
|
||||||
if finished != waiters {
|
if finished != waiters {
|
||||||
@ -107,7 +113,7 @@ func TestOperationsList(t *testing.T) {
|
|||||||
}
|
}
|
||||||
handler := Handle(map[string]RESTStorage{
|
handler := Handle(map[string]RESTStorage{
|
||||||
"foo": simpleStorage,
|
"foo": simpleStorage,
|
||||||
}, codec, "/prefix/version")
|
}, codec, "/prefix/version", selfLinker)
|
||||||
handler.(*defaultAPIServer).group.handler.asyncOpWait = 0
|
handler.(*defaultAPIServer).group.handler.asyncOpWait = 0
|
||||||
server := httptest.NewServer(handler)
|
server := httptest.NewServer(handler)
|
||||||
client := http.Client{}
|
client := http.Client{}
|
||||||
@ -163,7 +169,7 @@ func TestOpGet(t *testing.T) {
|
|||||||
}
|
}
|
||||||
handler := Handle(map[string]RESTStorage{
|
handler := Handle(map[string]RESTStorage{
|
||||||
"foo": simpleStorage,
|
"foo": simpleStorage,
|
||||||
}, codec, "/prefix/version")
|
}, codec, "/prefix/version", selfLinker)
|
||||||
handler.(*defaultAPIServer).group.handler.asyncOpWait = 0
|
handler.(*defaultAPIServer).group.handler.asyncOpWait = 0
|
||||||
server := httptest.NewServer(handler)
|
server := httptest.NewServer(handler)
|
||||||
client := http.Client{}
|
client := http.Client{}
|
||||||
|
@ -161,7 +161,7 @@ func TestProxy(t *testing.T) {
|
|||||||
}
|
}
|
||||||
handler := Handle(map[string]RESTStorage{
|
handler := Handle(map[string]RESTStorage{
|
||||||
"foo": simpleStorage,
|
"foo": simpleStorage,
|
||||||
}, codec, "/prefix/version")
|
}, codec, "/prefix/version", selfLinker)
|
||||||
server := httptest.NewServer(handler)
|
server := httptest.NewServer(handler)
|
||||||
|
|
||||||
req, err := http.NewRequest(
|
req, err := http.NewRequest(
|
||||||
|
@ -30,7 +30,7 @@ func TestRedirect(t *testing.T) {
|
|||||||
}
|
}
|
||||||
handler := Handle(map[string]RESTStorage{
|
handler := Handle(map[string]RESTStorage{
|
||||||
"foo": simpleStorage,
|
"foo": simpleStorage,
|
||||||
}, codec, "/prefix/version")
|
}, codec, "/prefix/version", selfLinker)
|
||||||
server := httptest.NewServer(handler)
|
server := httptest.NewServer(handler)
|
||||||
|
|
||||||
dontFollow := errors.New("don't follow")
|
dontFollow := errors.New("don't follow")
|
||||||
|
@ -18,19 +18,24 @@ package apiserver
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"path"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||||
|
|
||||||
|
"github.com/golang/glog"
|
||||||
)
|
)
|
||||||
|
|
||||||
type RESTHandler struct {
|
type RESTHandler struct {
|
||||||
storage map[string]RESTStorage
|
storage map[string]RESTStorage
|
||||||
codec runtime.Codec
|
codec runtime.Codec
|
||||||
ops *Operations
|
canonicalPrefix string
|
||||||
asyncOpWait time.Duration
|
selfLinker runtime.SelfLinker
|
||||||
|
ops *Operations
|
||||||
|
asyncOpWait time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// ServeHTTP handles requests to all RESTStorage objects.
|
// ServeHTTP handles requests to all RESTStorage objects.
|
||||||
@ -50,6 +55,37 @@ func (h *RESTHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|||||||
h.handleRESTStorage(parts, req, w, storage)
|
h.handleRESTStorage(parts, req, w, storage)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Sets the SelfLink field of the object.
|
||||||
|
func (h *RESTHandler) setSelfLink(obj runtime.Object, req *http.Request) error {
|
||||||
|
newURL := *req.URL
|
||||||
|
newURL.Path = path.Join(h.canonicalPrefix, req.URL.Path)
|
||||||
|
newURL.RawQuery = ""
|
||||||
|
newURL.Fragment = ""
|
||||||
|
return h.selfLinker.SetSelfLink(obj, newURL.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Like setSelfLink, but appends the object's id.
|
||||||
|
func (h *RESTHandler) setSelfLinkAddID(obj runtime.Object, req *http.Request) error {
|
||||||
|
id, err := h.selfLinker.ID(obj)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
newURL := *req.URL
|
||||||
|
newURL.Path = path.Join(h.canonicalPrefix, req.URL.Path, id)
|
||||||
|
newURL.RawQuery = ""
|
||||||
|
newURL.Fragment = ""
|
||||||
|
return h.selfLinker.SetSelfLink(obj, newURL.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
// curry adapts either of the self link setting functions into a function appropriate for operation's hook.
|
||||||
|
func curry(f func(runtime.Object, *http.Request) error, req *http.Request) func(runtime.Object) {
|
||||||
|
return func(obj runtime.Object) {
|
||||||
|
if err := f(obj, req); err != nil {
|
||||||
|
glog.Errorf("unable to set self link for %#v: %v", obj, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// handleRESTStorage is the main dispatcher for a storage object. It switches on the HTTP method, and then
|
// handleRESTStorage is the main dispatcher for a storage object. It switches on the HTTP method, and then
|
||||||
// on path length, according to the following table:
|
// on path length, according to the following table:
|
||||||
// Method Path Action
|
// Method Path Action
|
||||||
@ -86,6 +122,10 @@ func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w htt
|
|||||||
errorJSON(err, h.codec, w)
|
errorJSON(err, h.codec, w)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if err := h.setSelfLink(list, req); err != nil {
|
||||||
|
errorJSON(err, h.codec, w)
|
||||||
|
return
|
||||||
|
}
|
||||||
writeJSON(http.StatusOK, h.codec, list, w)
|
writeJSON(http.StatusOK, h.codec, list, w)
|
||||||
case 2:
|
case 2:
|
||||||
item, err := storage.Get(ctx, parts[1])
|
item, err := storage.Get(ctx, parts[1])
|
||||||
@ -93,6 +133,10 @@ func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w htt
|
|||||||
errorJSON(err, h.codec, w)
|
errorJSON(err, h.codec, w)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if err := h.setSelfLink(item, req); err != nil {
|
||||||
|
errorJSON(err, h.codec, w)
|
||||||
|
return
|
||||||
|
}
|
||||||
writeJSON(http.StatusOK, h.codec, item, w)
|
writeJSON(http.StatusOK, h.codec, item, w)
|
||||||
default:
|
default:
|
||||||
notFound(w, req)
|
notFound(w, req)
|
||||||
@ -119,7 +163,7 @@ func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w htt
|
|||||||
errorJSON(err, h.codec, w)
|
errorJSON(err, h.codec, w)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
op := h.createOperation(out, sync, timeout)
|
op := h.createOperation(out, sync, timeout, curry(h.setSelfLinkAddID, req))
|
||||||
h.finishReq(op, req, w)
|
h.finishReq(op, req, w)
|
||||||
|
|
||||||
case "DELETE":
|
case "DELETE":
|
||||||
@ -132,7 +176,7 @@ func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w htt
|
|||||||
errorJSON(err, h.codec, w)
|
errorJSON(err, h.codec, w)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
op := h.createOperation(out, sync, timeout)
|
op := h.createOperation(out, sync, timeout, nil)
|
||||||
h.finishReq(op, req, w)
|
h.finishReq(op, req, w)
|
||||||
|
|
||||||
case "PUT":
|
case "PUT":
|
||||||
@ -156,7 +200,7 @@ func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w htt
|
|||||||
errorJSON(err, h.codec, w)
|
errorJSON(err, h.codec, w)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
op := h.createOperation(out, sync, timeout)
|
op := h.createOperation(out, sync, timeout, curry(h.setSelfLink, req))
|
||||||
h.finishReq(op, req, w)
|
h.finishReq(op, req, w)
|
||||||
|
|
||||||
default:
|
default:
|
||||||
@ -165,8 +209,8 @@ func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w htt
|
|||||||
}
|
}
|
||||||
|
|
||||||
// createOperation creates an operation to process a channel response.
|
// createOperation creates an operation to process a channel response.
|
||||||
func (h *RESTHandler) createOperation(out <-chan runtime.Object, sync bool, timeout time.Duration) *Operation {
|
func (h *RESTHandler) createOperation(out <-chan runtime.Object, sync bool, timeout time.Duration, onReceive func(runtime.Object)) *Operation {
|
||||||
op := h.ops.NewOperation(out)
|
op := h.ops.NewOperation(out, onReceive)
|
||||||
if sync {
|
if sync {
|
||||||
op.WaitFor(timeout)
|
op.WaitFor(timeout)
|
||||||
} else if h.asyncOpWait != 0 {
|
} else if h.asyncOpWait != 0 {
|
||||||
|
@ -49,7 +49,7 @@ func TestWatchWebsocket(t *testing.T) {
|
|||||||
_ = ResourceWatcher(simpleStorage) // Give compile error if this doesn't work.
|
_ = ResourceWatcher(simpleStorage) // Give compile error if this doesn't work.
|
||||||
handler := Handle(map[string]RESTStorage{
|
handler := Handle(map[string]RESTStorage{
|
||||||
"foo": simpleStorage,
|
"foo": simpleStorage,
|
||||||
}, codec, "/prefix/version")
|
}, codec, "/prefix/version", selfLinker)
|
||||||
server := httptest.NewServer(handler)
|
server := httptest.NewServer(handler)
|
||||||
|
|
||||||
dest, _ := url.Parse(server.URL)
|
dest, _ := url.Parse(server.URL)
|
||||||
@ -95,7 +95,7 @@ func TestWatchHTTP(t *testing.T) {
|
|||||||
simpleStorage := &SimpleRESTStorage{}
|
simpleStorage := &SimpleRESTStorage{}
|
||||||
handler := Handle(map[string]RESTStorage{
|
handler := Handle(map[string]RESTStorage{
|
||||||
"foo": simpleStorage,
|
"foo": simpleStorage,
|
||||||
}, codec, "/prefix/version")
|
}, codec, "/prefix/version", selfLinker)
|
||||||
server := httptest.NewServer(handler)
|
server := httptest.NewServer(handler)
|
||||||
client := http.Client{}
|
client := http.Client{}
|
||||||
|
|
||||||
@ -148,7 +148,7 @@ func TestWatchParamParsing(t *testing.T) {
|
|||||||
simpleStorage := &SimpleRESTStorage{}
|
simpleStorage := &SimpleRESTStorage{}
|
||||||
handler := Handle(map[string]RESTStorage{
|
handler := Handle(map[string]RESTStorage{
|
||||||
"foo": simpleStorage,
|
"foo": simpleStorage,
|
||||||
}, codec, "/prefix/version")
|
}, codec, "/prefix/version", selfLinker)
|
||||||
server := httptest.NewServer(handler)
|
server := httptest.NewServer(handler)
|
||||||
|
|
||||||
dest, _ := url.Parse(server.URL)
|
dest, _ := url.Parse(server.URL)
|
||||||
@ -210,7 +210,7 @@ func TestWatchProtocolSelection(t *testing.T) {
|
|||||||
simpleStorage := &SimpleRESTStorage{}
|
simpleStorage := &SimpleRESTStorage{}
|
||||||
handler := Handle(map[string]RESTStorage{
|
handler := Handle(map[string]RESTStorage{
|
||||||
"foo": simpleStorage,
|
"foo": simpleStorage,
|
||||||
}, codec, "/prefix/version")
|
}, codec, "/prefix/version", selfLinker)
|
||||||
server := httptest.NewServer(handler)
|
server := httptest.NewServer(handler)
|
||||||
client := http.Client{}
|
client := http.Client{}
|
||||||
|
|
||||||
|
@ -152,19 +152,19 @@ func (m *Master) init(cloud cloudprovider.Interface, podInfoGetter client.PodInf
|
|||||||
}
|
}
|
||||||
|
|
||||||
// API_v1beta1 returns the resources and codec for API version v1beta1.
|
// API_v1beta1 returns the resources and codec for API version v1beta1.
|
||||||
func (m *Master) API_v1beta1() (map[string]apiserver.RESTStorage, runtime.Codec) {
|
func (m *Master) API_v1beta1() (map[string]apiserver.RESTStorage, runtime.Codec, string, runtime.SelfLinker) {
|
||||||
storage := make(map[string]apiserver.RESTStorage)
|
storage := make(map[string]apiserver.RESTStorage)
|
||||||
for k, v := range m.storage {
|
for k, v := range m.storage {
|
||||||
storage[k] = v
|
storage[k] = v
|
||||||
}
|
}
|
||||||
return storage, v1beta1.Codec
|
return storage, v1beta1.Codec, "/api/v1beta1", latest.SelfLinker
|
||||||
}
|
}
|
||||||
|
|
||||||
// API_v1beta2 returns the resources and codec for API version v1beta2.
|
// API_v1beta2 returns the resources and codec for API version v1beta2.
|
||||||
func (m *Master) API_v1beta2() (map[string]apiserver.RESTStorage, runtime.Codec) {
|
func (m *Master) API_v1beta2() (map[string]apiserver.RESTStorage, runtime.Codec, string, runtime.SelfLinker) {
|
||||||
storage := make(map[string]apiserver.RESTStorage)
|
storage := make(map[string]apiserver.RESTStorage)
|
||||||
for k, v := range m.storage {
|
for k, v := range m.storage {
|
||||||
storage[k] = v
|
storage[k] = v
|
||||||
}
|
}
|
||||||
return storage, v1beta2.Codec
|
return storage, v1beta2.Codec, "/api/v1beta1", latest.SelfLinker
|
||||||
}
|
}
|
||||||
|
@ -45,20 +45,22 @@ func TestClient(t *testing.T) {
|
|||||||
m := master.New(&master.Config{
|
m := master.New(&master.Config{
|
||||||
EtcdHelper: helper,
|
EtcdHelper: helper,
|
||||||
})
|
})
|
||||||
s1, c1 := m.API_v1beta1()
|
s1, c1, loc1, sl1 := m.API_v1beta1()
|
||||||
s2, c2 := m.API_v1beta2()
|
s2, c2, loc2, sl2 := m.API_v1beta2()
|
||||||
|
|
||||||
testCases := map[string]struct {
|
testCases := map[string]struct {
|
||||||
Storage map[string]apiserver.RESTStorage
|
Storage map[string]apiserver.RESTStorage
|
||||||
Codec runtime.Codec
|
Codec runtime.Codec
|
||||||
|
location string
|
||||||
|
selfLinker runtime.SelfLinker
|
||||||
}{
|
}{
|
||||||
"v1beta1": {s1, c1},
|
"v1beta1": {s1, c1, loc1, sl1},
|
||||||
"v1beta2": {s2, c2},
|
"v1beta2": {s2, c2, loc2, sl2},
|
||||||
}
|
}
|
||||||
|
|
||||||
for apiVersion, values := range testCases {
|
for apiVersion, values := range testCases {
|
||||||
deleteAllEtcdKeys()
|
deleteAllEtcdKeys()
|
||||||
s := httptest.NewServer(apiserver.Handle(values.Storage, values.Codec, fmt.Sprintf("/api/%s/", apiVersion)))
|
s := httptest.NewServer(apiserver.Handle(values.Storage, values.Codec, fmt.Sprintf("/api/%s/", apiVersion), values.selfLinker))
|
||||||
client := client.NewOrDie(s.URL, apiVersion, nil)
|
client := client.NewOrDie(s.URL, apiVersion, nil)
|
||||||
|
|
||||||
info, err := client.ServerVersion()
|
info, err := client.ServerVersion()
|
||||||
|
Loading…
Reference in New Issue
Block a user