Merge pull request #3248 from derekwaynecarr/admission_control_hooks

Implement basic admission control framework
This commit is contained in:
Eric Tune
2015-01-07 16:52:49 -08:00
25 changed files with 634 additions and 67 deletions

View File

@@ -27,6 +27,7 @@ import (
"strings"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/admission"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
@@ -55,9 +56,9 @@ const (
// Handle returns a Handler function that exposes the provided storage interfaces
// as RESTful resources at prefix, serialized by codec, and also includes the support
// http resources.
func Handle(storage map[string]RESTStorage, codec runtime.Codec, root string, version string, selfLinker runtime.SelfLinker) http.Handler {
func Handle(storage map[string]RESTStorage, codec runtime.Codec, root string, version string, selfLinker runtime.SelfLinker, admissionControl admission.Interface) http.Handler {
prefix := root + "/" + version
group := NewAPIGroupVersion(storage, codec, prefix, selfLinker)
group := NewAPIGroupVersion(storage, codec, prefix, selfLinker, admissionControl)
container := restful.NewContainer()
mux := container.ServeMux
group.InstallREST(container, root, version)
@@ -83,13 +84,14 @@ type APIGroupVersion struct {
// This is a helper method for registering multiple sets of REST handlers under different
// prefixes onto a server.
// TODO: add multitype codec serialization
func NewAPIGroupVersion(storage map[string]RESTStorage, codec runtime.Codec, canonicalPrefix string, selfLinker runtime.SelfLinker) *APIGroupVersion {
func NewAPIGroupVersion(storage map[string]RESTStorage, codec runtime.Codec, canonicalPrefix string, selfLinker runtime.SelfLinker, admissionControl admission.Interface) *APIGroupVersion {
return &APIGroupVersion{RESTHandler{
storage: storage,
codec: codec,
canonicalPrefix: canonicalPrefix,
selfLinker: selfLinker,
ops: NewOperations(),
storage: storage,
codec: codec,
canonicalPrefix: canonicalPrefix,
selfLinker: selfLinker,
ops: NewOperations(),
admissionControl: admissionControl,
// Delay just long enough to handle most simple write operations
asyncOpWait: time.Millisecond * 25,
}}

View File

@@ -30,6 +30,7 @@ import (
"testing"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/admission"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
apierrs "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta"
@@ -38,6 +39,8 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/version"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/admission/admit"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/admission/deny"
)
func convert(obj runtime.Object) (runtime.Object, error) {
@@ -53,6 +56,7 @@ var accessor = meta.NewAccessor()
var versioner runtime.ResourceVersioner = accessor
var selfLinker runtime.SelfLinker = accessor
var mapper meta.RESTMapper
var admissionControl admission.Interface
func interfacesFor(version string) (*meta.VersionInterfaces, error) {
switch version {
@@ -92,6 +96,7 @@ func init() {
)
defMapper.Add(api.Scheme, true, versions...)
mapper = defMapper
admissionControl = admit.NewAlwaysAdmit()
}
type Simple struct {
@@ -262,7 +267,7 @@ func TestNotFound(t *testing.T) {
}
handler := Handle(map[string]RESTStorage{
"foo": &SimpleRESTStorage{},
}, codec, "/prefix", testVersion, selfLinker)
}, codec, "/prefix", testVersion, selfLinker, admissionControl)
server := httptest.NewServer(handler)
defer server.Close()
client := http.Client{}
@@ -284,7 +289,7 @@ func TestNotFound(t *testing.T) {
}
func TestVersion(t *testing.T) {
handler := Handle(map[string]RESTStorage{}, codec, "/prefix", testVersion, selfLinker)
handler := Handle(map[string]RESTStorage{}, codec, "/prefix", testVersion, selfLinker, admissionControl)
server := httptest.NewServer(handler)
defer server.Close()
client := http.Client{}
@@ -319,7 +324,7 @@ func TestSimpleList(t *testing.T) {
namespace: "other",
expectedSet: "/prefix/version/simple?namespace=other",
}
handler := Handle(storage, codec, "/prefix", testVersion, selfLinker)
handler := Handle(storage, codec, "/prefix", testVersion, selfLinker, admissionControl)
server := httptest.NewServer(handler)
defer server.Close()
@@ -342,7 +347,7 @@ func TestErrorList(t *testing.T) {
errors: map[string]error{"list": fmt.Errorf("test Error")},
}
storage["simple"] = &simpleStorage
handler := Handle(storage, codec, "/prefix", testVersion, selfLinker)
handler := Handle(storage, codec, "/prefix", testVersion, selfLinker, admissionControl)
server := httptest.NewServer(handler)
defer server.Close()
@@ -368,7 +373,7 @@ func TestNonEmptyList(t *testing.T) {
},
}
storage["simple"] = &simpleStorage
handler := Handle(storage, codec, "/prefix", testVersion, selfLinker)
handler := Handle(storage, codec, "/prefix", testVersion, selfLinker, admissionControl)
server := httptest.NewServer(handler)
defer server.Close()
@@ -414,7 +419,7 @@ func TestGet(t *testing.T) {
expectedSet: "/prefix/version/simple/id",
}
storage["simple"] = &simpleStorage
handler := Handle(storage, codec, "/prefix", testVersion, selfLinker)
handler := Handle(storage, codec, "/prefix", testVersion, selfLinker, admissionControl)
server := httptest.NewServer(handler)
defer server.Close()
@@ -439,7 +444,7 @@ func TestGetMissing(t *testing.T) {
errors: map[string]error{"get": apierrs.NewNotFound("simple", "id")},
}
storage["simple"] = &simpleStorage
handler := Handle(storage, codec, "/prefix", testVersion, selfLinker)
handler := Handle(storage, codec, "/prefix", testVersion, selfLinker, admissionControl)
server := httptest.NewServer(handler)
defer server.Close()
@@ -458,7 +463,7 @@ func TestDelete(t *testing.T) {
simpleStorage := SimpleRESTStorage{}
ID := "id"
storage["simple"] = &simpleStorage
handler := Handle(storage, codec, "/prefix", testVersion, selfLinker)
handler := Handle(storage, codec, "/prefix", testVersion, selfLinker, admissionControl)
server := httptest.NewServer(handler)
defer server.Close()
@@ -474,6 +479,26 @@ func TestDelete(t *testing.T) {
}
}
func TestDeleteInvokesAdmissionControl(t *testing.T) {
storage := map[string]RESTStorage{}
simpleStorage := SimpleRESTStorage{}
ID := "id"
storage["simple"] = &simpleStorage
handler := Handle(storage, codec, "/prefix", testVersion, selfLinker, deny.NewAlwaysDeny())
server := httptest.NewServer(handler)
defer server.Close()
client := http.Client{}
request, err := http.NewRequest("DELETE", server.URL+"/prefix/version/simple/"+ID, nil)
response, err := client.Do(request)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if response.StatusCode != http.StatusForbidden {
t.Errorf("Unexpected response %#v", response)
}
}
func TestDeleteMissing(t *testing.T) {
storage := map[string]RESTStorage{}
ID := "id"
@@ -481,7 +506,7 @@ func TestDeleteMissing(t *testing.T) {
errors: map[string]error{"delete": apierrs.NewNotFound("simple", ID)},
}
storage["simple"] = &simpleStorage
handler := Handle(storage, codec, "/prefix", testVersion, selfLinker)
handler := Handle(storage, codec, "/prefix", testVersion, selfLinker, admissionControl)
server := httptest.NewServer(handler)
defer server.Close()
@@ -506,7 +531,7 @@ func TestUpdate(t *testing.T) {
t: t,
expectedSet: "/prefix/version/simple/" + ID,
}
handler := Handle(storage, codec, "/prefix", testVersion, selfLinker)
handler := Handle(storage, codec, "/prefix", testVersion, selfLinker, admissionControl)
server := httptest.NewServer(handler)
defer server.Close()
@@ -534,6 +559,39 @@ func TestUpdate(t *testing.T) {
}
}
func TestUpdateInvokesAdmissionControl(t *testing.T) {
storage := map[string]RESTStorage{}
simpleStorage := SimpleRESTStorage{}
ID := "id"
storage["simple"] = &simpleStorage
selfLinker := &setTestSelfLinker{
t: t,
expectedSet: "/prefix/version/simple/" + ID,
}
handler := Handle(storage, codec, "/prefix", testVersion, selfLinker, deny.NewAlwaysDeny())
server := httptest.NewServer(handler)
defer server.Close()
item := &Simple{
Other: "bar",
}
body, err := codec.Encode(item)
if err != nil {
// The following cases will fail, so die now
t.Fatalf("unexpected error: %v", err)
}
client := http.Client{}
request, err := http.NewRequest("PUT", server.URL+"/prefix/version/simple/"+ID, bytes.NewReader(body))
response, err := client.Do(request)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if response.StatusCode != http.StatusForbidden {
t.Errorf("Unexpected response %#v", response)
}
}
func TestUpdateMissing(t *testing.T) {
storage := map[string]RESTStorage{}
ID := "id"
@@ -541,7 +599,7 @@ func TestUpdateMissing(t *testing.T) {
errors: map[string]error{"update": apierrs.NewNotFound("simple", ID)},
}
storage["simple"] = &simpleStorage
handler := Handle(storage, codec, "/prefix", testVersion, selfLinker)
handler := Handle(storage, codec, "/prefix", testVersion, selfLinker, admissionControl)
server := httptest.NewServer(handler)
defer server.Close()
@@ -576,7 +634,7 @@ func TestCreate(t *testing.T) {
}
handler := Handle(map[string]RESTStorage{
"foo": simpleStorage,
}, codec, "/prefix", testVersion, selfLinker)
}, codec, "/prefix", testVersion, selfLinker, admissionControl)
handler.(*defaultAPIServer).group.handler.asyncOpWait = 0
server := httptest.NewServer(handler)
defer server.Close()
@@ -612,6 +670,41 @@ func TestCreate(t *testing.T) {
wait.Done()
}
func TestCreateInvokesAdmissionControl(t *testing.T) {
wait := sync.WaitGroup{}
wait.Add(1)
simpleStorage := &SimpleRESTStorage{
injectedFunction: func(obj runtime.Object) (returnObj runtime.Object, err error) {
wait.Wait()
return &Simple{}, nil
},
}
handler := Handle(map[string]RESTStorage{
"foo": simpleStorage,
}, codec, "/prefix", testVersion, selfLinker, deny.NewAlwaysDeny())
handler.(*defaultAPIServer).group.handler.asyncOpWait = 0
server := httptest.NewServer(handler)
defer server.Close()
client := http.Client{}
simple := &Simple{
Other: "foo",
}
data, _ := codec.Encode(simple)
request, err := http.NewRequest("POST", server.URL+"/prefix/version/foo", bytes.NewBuffer(data))
if err != nil {
t.Errorf("unexpected error: %v", err)
}
response, err := client.Do(request)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if response.StatusCode != http.StatusForbidden {
t.Errorf("Unexpected response %#v", response)
}
}
func TestCreateNotFound(t *testing.T) {
handler := Handle(map[string]RESTStorage{
"simple": &SimpleRESTStorage{
@@ -619,7 +712,7 @@ func TestCreateNotFound(t *testing.T) {
// See https://github.com/GoogleCloudPlatform/kubernetes/pull/486#discussion_r15037092.
errors: map[string]error{"create": apierrs.NewNotFound("simple", "id")},
},
}, codec, "/prefix", testVersion, selfLinker)
}, codec, "/prefix", testVersion, selfLinker, admissionControl)
server := httptest.NewServer(handler)
defer server.Close()
client := http.Client{}
@@ -687,7 +780,7 @@ func TestSyncCreate(t *testing.T) {
}
handler := Handle(map[string]RESTStorage{
"foo": &storage,
}, codec, "/prefix", testVersion, selfLinker)
}, codec, "/prefix", testVersion, selfLinker, admissionControl)
server := httptest.NewServer(handler)
defer server.Close()
client := http.Client{}
@@ -760,7 +853,7 @@ func TestAsyncDelayReturnsError(t *testing.T) {
return nil, apierrs.NewAlreadyExists("foo", "bar")
},
}
handler := Handle(map[string]RESTStorage{"foo": &storage}, codec, "/prefix", testVersion, selfLinker)
handler := Handle(map[string]RESTStorage{"foo": &storage}, codec, "/prefix", testVersion, selfLinker, admissionControl)
handler.(*defaultAPIServer).group.handler.asyncOpWait = time.Millisecond / 2
server := httptest.NewServer(handler)
defer server.Close()
@@ -784,7 +877,7 @@ func TestAsyncCreateError(t *testing.T) {
name: "bar",
expectedSet: "/prefix/version/foo/bar",
}
handler := Handle(map[string]RESTStorage{"foo": &storage}, codec, "/prefix", testVersion, selfLinker)
handler := Handle(map[string]RESTStorage{"foo": &storage}, codec, "/prefix", testVersion, selfLinker, admissionControl)
handler.(*defaultAPIServer).group.handler.asyncOpWait = 0
server := httptest.NewServer(handler)
defer server.Close()
@@ -884,7 +977,7 @@ func TestSyncCreateTimeout(t *testing.T) {
}
handler := Handle(map[string]RESTStorage{
"foo": &storage,
}, codec, "/prefix", testVersion, selfLinker)
}, codec, "/prefix", testVersion, selfLinker, admissionControl)
server := httptest.NewServer(handler)
defer server.Close()
@@ -916,7 +1009,7 @@ func TestCORSAllowedOrigins(t *testing.T) {
}
handler := CORS(
Handle(map[string]RESTStorage{}, codec, "/prefix", testVersion, selfLinker),
Handle(map[string]RESTStorage{}, codec, "/prefix", testVersion, selfLinker, admissionControl),
allowedOriginRegexps, nil, nil, "true",
)
server := httptest.NewServer(handler)

View File

@@ -113,7 +113,7 @@ func TestOperationsList(t *testing.T) {
}
handler := Handle(map[string]RESTStorage{
"foo": simpleStorage,
}, codec, "/prefix", "version", selfLinker)
}, codec, "/prefix", "version", selfLinker, admissionControl)
handler.(*defaultAPIServer).group.handler.asyncOpWait = 0
server := httptest.NewServer(handler)
defer server.Close()
@@ -170,7 +170,7 @@ func TestOpGet(t *testing.T) {
}
handler := Handle(map[string]RESTStorage{
"foo": simpleStorage,
}, codec, "/prefix", "version", selfLinker)
}, codec, "/prefix", "version", selfLinker, admissionControl)
handler.(*defaultAPIServer).group.handler.asyncOpWait = 0
server := httptest.NewServer(handler)
defer server.Close()

View File

@@ -182,7 +182,7 @@ func TestProxy(t *testing.T) {
}
handler := Handle(map[string]RESTStorage{
"foo": simpleStorage,
}, codec, "/prefix", "version", selfLinker)
}, codec, "/prefix", "version", selfLinker, admissionControl)
server := httptest.NewServer(handler)
defer server.Close()

View File

@@ -31,7 +31,7 @@ func TestRedirect(t *testing.T) {
}
handler := Handle(map[string]RESTStorage{
"foo": simpleStorage,
}, codec, "/prefix", "version", selfLinker)
}, codec, "/prefix", "version", selfLinker, admissionControl)
server := httptest.NewServer(handler)
defer server.Close()
@@ -84,7 +84,7 @@ func TestRedirectWithNamespaces(t *testing.T) {
}
handler := Handle(map[string]RESTStorage{
"foo": simpleStorage,
}, codec, "/prefix", "version", selfLinker)
}, codec, "/prefix", "version", selfLinker, admissionControl)
server := httptest.NewServer(handler)
defer server.Close()

View File

@@ -21,6 +21,7 @@ import (
"path"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/admission"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
@@ -31,12 +32,13 @@ import (
// RESTHandler implements HTTP verbs on a set of RESTful resources identified by name.
type RESTHandler struct {
storage map[string]RESTStorage
codec runtime.Codec
canonicalPrefix string
selfLinker runtime.SelfLinker
ops *Operations
asyncOpWait time.Duration
storage map[string]RESTStorage
codec runtime.Codec
canonicalPrefix string
selfLinker runtime.SelfLinker
ops *Operations
asyncOpWait time.Duration
admissionControl admission.Interface
}
// ServeHTTP handles requests to all RESTStorage objects.
@@ -205,6 +207,14 @@ func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w htt
errorJSON(err, h.codec, w)
return
}
// invoke admission control
err = h.admissionControl.Admit(admission.NewAttributesRecord(obj, namespace, parts[0], "CREATE"))
if err != nil {
errorJSON(err, h.codec, w)
return
}
out, err := storage.Create(ctx, obj)
if err != nil {
errorJSON(err, h.codec, w)
@@ -218,6 +228,14 @@ func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w htt
notFound(w, req)
return
}
// invoke admission control
err := h.admissionControl.Admit(admission.NewAttributesRecord(nil, namespace, parts[0], "DELETE"))
if err != nil {
errorJSON(err, h.codec, w)
return
}
out, err := storage.Delete(ctx, parts[1])
if err != nil {
errorJSON(err, h.codec, w)
@@ -242,6 +260,14 @@ func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w htt
errorJSON(err, h.codec, w)
return
}
// invoke admission control
err = h.admissionControl.Admit(admission.NewAttributesRecord(obj, namespace, parts[0], "UPDATE"))
if err != nil {
errorJSON(err, h.codec, w)
return
}
out, err := storage.Update(ctx, obj)
if err != nil {
errorJSON(err, h.codec, w)

View File

@@ -50,7 +50,7 @@ func TestWatchWebsocket(t *testing.T) {
_ = ResourceWatcher(simpleStorage) // Give compile error if this doesn't work.
handler := Handle(map[string]RESTStorage{
"foo": simpleStorage,
}, codec, "/api", "version", selfLinker)
}, codec, "/api", "version", selfLinker, admissionControl)
server := httptest.NewServer(handler)
defer server.Close()
@@ -104,7 +104,7 @@ func TestWatchHTTP(t *testing.T) {
simpleStorage := &SimpleRESTStorage{}
handler := Handle(map[string]RESTStorage{
"foo": simpleStorage,
}, codec, "/api", "version", selfLinker)
}, codec, "/api", "version", selfLinker, admissionControl)
server := httptest.NewServer(handler)
defer server.Close()
client := http.Client{}
@@ -167,7 +167,7 @@ func TestWatchParamParsing(t *testing.T) {
simpleStorage := &SimpleRESTStorage{}
handler := Handle(map[string]RESTStorage{
"foo": simpleStorage,
}, codec, "/api", "version", selfLinker)
}, codec, "/api", "version", selfLinker, admissionControl)
server := httptest.NewServer(handler)
defer server.Close()
@@ -239,7 +239,7 @@ func TestWatchProtocolSelection(t *testing.T) {
simpleStorage := &SimpleRESTStorage{}
handler := Handle(map[string]RESTStorage{
"foo": simpleStorage,
}, codec, "/api", "version", selfLinker)
}, codec, "/api", "version", selfLinker, admissionControl)
server := httptest.NewServer(handler)
defer server.Close()
defer server.CloseClientConnections()