mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 06:54:01 +00:00
Merge pull request #44786 from tsandall/f8n-scheduling-policy
Automatic merge from submit-queue (batch tested with PRs 46235, 44786, 46833, 46756, 46669) federation: Add admission controller for policy-based placement @nikhiljindal Here's the initial version of the scheduling policy admission controller. It's at the point where it would benefit from having another pair of eyes look at it. The main thing I'm unsure of is the serialization of Kube resources for the webhook/query call. Release Note: ``` The federation-apiserver now supports a SchedulingPolicy admission controller that enables policy-based control over placement of federated resources. ``` Ref #39982
This commit is contained in:
commit
eae59aaf72
@ -30,6 +30,7 @@ filegroup(
|
||||
"//federation/pkg/federatedtypes:all-srcs",
|
||||
"//federation/pkg/federation-controller:all-srcs",
|
||||
"//federation/pkg/kubefed:all-srcs",
|
||||
"//federation/plugin/pkg/admission/schedulingpolicy:all-srcs",
|
||||
"//federation/registry/cluster:all-srcs",
|
||||
],
|
||||
tags = ["automanaged"],
|
||||
|
@ -28,6 +28,7 @@ go_library(
|
||||
"//federation/apis/federation/install:go_default_library",
|
||||
"//federation/apis/federation/v1beta1:go_default_library",
|
||||
"//federation/cmd/federation-apiserver/app/options:go_default_library",
|
||||
"//federation/plugin/pkg/admission/schedulingpolicy:go_default_library",
|
||||
"//federation/registry/cluster/etcd:go_default_library",
|
||||
"//pkg/api:go_default_library",
|
||||
"//pkg/api/install:go_default_library",
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
|
||||
// Admission policies
|
||||
"k8s.io/apiserver/pkg/admission"
|
||||
"k8s.io/kubernetes/federation/plugin/pkg/admission/schedulingpolicy"
|
||||
"k8s.io/kubernetes/plugin/pkg/admission/admit"
|
||||
"k8s.io/kubernetes/plugin/pkg/admission/deny"
|
||||
"k8s.io/kubernetes/plugin/pkg/admission/gc"
|
||||
@ -37,4 +38,5 @@ func registerAllAdmissionPlugins(plugins *admission.Plugins) {
|
||||
deny.Register(plugins)
|
||||
gc.Register(plugins)
|
||||
initialization.Register(plugins)
|
||||
schedulingpolicy.Register(plugins)
|
||||
}
|
||||
|
70
federation/plugin/pkg/admission/schedulingpolicy/BUILD
Normal file
70
federation/plugin/pkg/admission/schedulingpolicy/BUILD
Normal file
@ -0,0 +1,70 @@
|
||||
package(default_visibility = ["//visibility:public"])
|
||||
|
||||
licenses(["notice"])
|
||||
|
||||
load(
|
||||
"@io_bazel_rules_go//go:def.bzl",
|
||||
"go_library",
|
||||
"go_test",
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = [
|
||||
"admission_test.go",
|
||||
"merge_test.go",
|
||||
],
|
||||
library = ":go_default_library",
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//pkg/api:go_default_library",
|
||||
"//pkg/api/v1:go_default_library",
|
||||
"//pkg/apis/extensions/v1beta1:go_default_library",
|
||||
"//pkg/client/clientset_generated/internalclientset/fake:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/admission:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library",
|
||||
"//vendor/k8s.io/client-go/testing:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"admission.go",
|
||||
"merge.go",
|
||||
"query.go",
|
||||
],
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//pkg/api:go_default_library",
|
||||
"//pkg/api/ref:go_default_library",
|
||||
"//pkg/client/clientset_generated/internalclientset:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/yaml:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/admission:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/util/webhook:go_default_library",
|
||||
"//vendor/k8s.io/client-go/dynamic:go_default_library",
|
||||
"//vendor/k8s.io/client-go/rest:go_default_library",
|
||||
"//vendor/k8s.io/client-go/tools/clientcmd:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "package-srcs",
|
||||
srcs = glob(["**"]),
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:private"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [":package-srcs"],
|
||||
tags = ["automanaged"],
|
||||
)
|
213
federation/plugin/pkg/admission/schedulingpolicy/admission.go
Normal file
213
federation/plugin/pkg/admission/schedulingpolicy/admission.go
Normal file
@ -0,0 +1,213 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package schedulingpolicy implements a webhook that queries an external API
|
||||
// to obtain scheduling decisions for Federated sources.
|
||||
package schedulingpolicy
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/yaml"
|
||||
"k8s.io/apiserver/pkg/admission"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/ref"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||
)
|
||||
|
||||
const (
|
||||
pluginName = "SchedulingPolicy"
|
||||
configKey = "schedulingPolicy"
|
||||
policyConfigMapNamespace = "kube-federation-scheduling-policy"
|
||||
|
||||
// Default backoff delay for policy engine query retries. The actual
|
||||
// backoff implementation is handled by k8s.io/apiserver/pkg/util/webhook.
|
||||
// If the admission controller config file does not specify a backoff, this
|
||||
// one is used.
|
||||
defaultRetryBackoff = time.Millisecond * 100
|
||||
)
|
||||
|
||||
type admissionConfig struct {
|
||||
Kubeconfig string `json:"kubeconfig"`
|
||||
RetryBackoff time.Duration `json:"retryBackoff"`
|
||||
}
|
||||
|
||||
type admissionController struct {
|
||||
*admission.Handler
|
||||
policyEngineClient *rest.RESTClient // client to communicate with policy engine
|
||||
policyEngineRetryBackoff time.Duration // backoff for policy engine queries
|
||||
client internalclientset.Interface // client to communicate with federation-apiserver
|
||||
}
|
||||
|
||||
// Register registers the plugin.
|
||||
func Register(plugins *admission.Plugins) {
|
||||
plugins.Register(pluginName, func(file io.Reader) (admission.Interface, error) {
|
||||
return newAdmissionController(file)
|
||||
})
|
||||
}
|
||||
|
||||
func newAdmissionController(file io.Reader) (*admissionController, error) {
|
||||
config, err := loadConfig(file)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
policyEngineClient, err := loadRestClient(config.Kubeconfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c := &admissionController{
|
||||
Handler: admission.NewHandler(admission.Create, admission.Update),
|
||||
policyEngineClient: policyEngineClient,
|
||||
policyEngineRetryBackoff: config.RetryBackoff,
|
||||
}
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (c *admissionController) Validate() error {
|
||||
if c.client == nil {
|
||||
return fmt.Errorf("%s requires a client", pluginName)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *admissionController) SetInternalKubeClientSet(client internalclientset.Interface) {
|
||||
c.client = client
|
||||
}
|
||||
|
||||
func (c *admissionController) Admit(a admission.Attributes) (err error) {
|
||||
exists, err := c.policyExists()
|
||||
if err != nil {
|
||||
return c.handleError(a, err)
|
||||
}
|
||||
|
||||
if !exists {
|
||||
return nil
|
||||
}
|
||||
|
||||
obj := a.GetObject()
|
||||
decision, err := newPolicyEngineQuery(c.policyEngineClient, c.policyEngineRetryBackoff, obj, a.GetKind()).Do()
|
||||
|
||||
if err != nil {
|
||||
return c.handleError(a, err)
|
||||
}
|
||||
|
||||
if err := decision.Error(); err != nil {
|
||||
return c.handleError(a, err)
|
||||
}
|
||||
|
||||
mergeAnnotations(obj, decision.Annotations)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *admissionController) handleError(a admission.Attributes, err error) error {
|
||||
|
||||
c.publishEvent(a, err.Error())
|
||||
|
||||
return admission.NewForbidden(a, err)
|
||||
}
|
||||
|
||||
func (c *admissionController) publishEvent(a admission.Attributes, msg string) {
|
||||
|
||||
obj := a.GetObject()
|
||||
|
||||
ref, err := ref.GetReference(api.Scheme, obj)
|
||||
if err != nil {
|
||||
runtime.HandleError(err)
|
||||
return
|
||||
}
|
||||
|
||||
event := &api.Event{
|
||||
InvolvedObject: *ref,
|
||||
Message: msg,
|
||||
Source: api.EventSource{
|
||||
Component: fmt.Sprintf("schedulingpolicy"),
|
||||
},
|
||||
Type: "Warning",
|
||||
}
|
||||
|
||||
if _, err := c.client.Core().Events(a.GetNamespace()).Create(event); err != nil {
|
||||
runtime.HandleError(err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (c *admissionController) policyExists() (bool, error) {
|
||||
lst, err := c.client.Core().ConfigMaps(policyConfigMapNamespace).List(metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return true, err
|
||||
}
|
||||
return len(lst.Items) > 0, nil
|
||||
}
|
||||
|
||||
func loadConfig(file io.Reader) (*admissionConfig, error) {
|
||||
var cfg admissionConfig
|
||||
if file == nil {
|
||||
return nil, fmt.Errorf("--admission-control-config-file not specified or invalid")
|
||||
}
|
||||
|
||||
if err := yaml.NewYAMLOrJSONDecoder(file, 4096).Decode(&cfg); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(cfg.Kubeconfig) == 0 {
|
||||
return nil, fmt.Errorf("kubeconfig path must not be empty")
|
||||
}
|
||||
|
||||
if cfg.RetryBackoff == 0 {
|
||||
cfg.RetryBackoff = defaultRetryBackoff
|
||||
} else {
|
||||
// Scale up value from config (which is unmarshalled as ns).
|
||||
cfg.RetryBackoff *= time.Millisecond
|
||||
}
|
||||
|
||||
if cfg.RetryBackoff.Nanoseconds() < 0 {
|
||||
return nil, fmt.Errorf("retryBackoff must not be negative")
|
||||
}
|
||||
|
||||
return &cfg, nil
|
||||
}
|
||||
|
||||
func loadRestClient(kubeConfigFile string) (*rest.RESTClient, error) {
|
||||
|
||||
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
|
||||
loadingRules.ExplicitPath = kubeConfigFile
|
||||
loader := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, &clientcmd.ConfigOverrides{})
|
||||
|
||||
clientConfig, err := loader.ClientConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
clientConfig.ContentConfig.NegotiatedSerializer = dynamic.ContentConfig().NegotiatedSerializer
|
||||
|
||||
restClient, err := rest.UnversionedRESTClientFor(clientConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return restClient, nil
|
||||
}
|
@ -0,0 +1,473 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package schedulingpolicy
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"html/template"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apiserver/pkg/admission"
|
||||
"k8s.io/apiserver/pkg/authentication/user"
|
||||
core "k8s.io/client-go/testing"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
|
||||
)
|
||||
|
||||
func TestNewAdmissionController(t *testing.T) {
|
||||
tempfile, err := ioutil.TempFile("", "")
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error while creating temporary file: %v", err)
|
||||
}
|
||||
p := tempfile.Name()
|
||||
defer os.Remove(p)
|
||||
|
||||
kubeconfig := `
|
||||
clusters:
|
||||
- name: foo
|
||||
cluster:
|
||||
server: https://example.com
|
||||
users:
|
||||
- name: alice
|
||||
user:
|
||||
token: deadbeef
|
||||
contexts:
|
||||
- name: default
|
||||
context:
|
||||
cluster: foo
|
||||
user: alice
|
||||
current-context: default
|
||||
`
|
||||
|
||||
if _, err := tempfile.WriteString(kubeconfig); err != nil {
|
||||
t.Fatalf("Unexpected error while writing test kubeconfig file: %v", err)
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
note string
|
||||
input string
|
||||
wantErr bool
|
||||
}{
|
||||
{"no config", "", true},
|
||||
{"bad json", `{"foo": `, true},
|
||||
{"bad yaml", `{foo" `, true},
|
||||
{
|
||||
"missing kubeconfig",
|
||||
`{"foo": {}}`,
|
||||
true,
|
||||
},
|
||||
{
|
||||
"kubeconfig not found",
|
||||
`{
|
||||
"kubeconfig": "/kube-federation-scheduling-policy-file-not-found-test"
|
||||
}`,
|
||||
true,
|
||||
},
|
||||
{
|
||||
"bad retry backoff",
|
||||
fmt.Sprintf(`
|
||||
{
|
||||
"kubeconfig": %q,
|
||||
"retryBackoff": -1
|
||||
}
|
||||
`, p),
|
||||
true,
|
||||
},
|
||||
{
|
||||
"a valid config",
|
||||
fmt.Sprintf(`
|
||||
{
|
||||
"kubeconfig": %q
|
||||
}
|
||||
`, p),
|
||||
false,
|
||||
},
|
||||
{
|
||||
"a valid config with retry backoff",
|
||||
fmt.Sprintf(`
|
||||
{
|
||||
"kubeconfig": %q,
|
||||
"retryBackoff": 200
|
||||
}
|
||||
`, p),
|
||||
false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
var file io.Reader
|
||||
if tc.input == "" {
|
||||
file = nil
|
||||
} else {
|
||||
file = bytes.NewBufferString(tc.input)
|
||||
}
|
||||
|
||||
_, err := newAdmissionController(file)
|
||||
|
||||
if tc.wantErr && err == nil {
|
||||
t.Errorf("%v: Expected error", tc.note)
|
||||
} else if !tc.wantErr && err != nil {
|
||||
t.Errorf("%v: Unexpected error: %v", tc.note, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestAdmitQueryPayload(t *testing.T) {
|
||||
var body interface{}
|
||||
|
||||
serve := func(w http.ResponseWriter, r *http.Request) {
|
||||
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
|
||||
t.Fatalf("Unexpected error reading admission payload: %v", err)
|
||||
}
|
||||
|
||||
// No errors or annotations.
|
||||
w.Write([]byte(`{}`))
|
||||
}
|
||||
|
||||
controller, err := newControllerWithTestServer(serve, true)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error while creating test admission controller/server: %v", err)
|
||||
}
|
||||
|
||||
rs := makeReplicaSet()
|
||||
rs.Spec.MinReadySeconds = 100
|
||||
attrs := makeAdmissionRecord(rs)
|
||||
err = controller.Admit(attrs)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error from admission controller: %v", err)
|
||||
}
|
||||
|
||||
obj := body.(map[string]interface{})
|
||||
metadata := obj["metadata"].(map[string]interface{})
|
||||
spec := obj["spec"].(map[string]interface{})
|
||||
name := metadata["name"].(string)
|
||||
minReadySeconds := spec["minReadySeconds"].(float64)
|
||||
|
||||
expectedName := "myapp"
|
||||
if name != expectedName {
|
||||
t.Fatalf("Expected replicaset.metadata.name to be %v but got: %v", expectedName, name)
|
||||
}
|
||||
|
||||
expectedMinReadySeconds := float64(100)
|
||||
if minReadySeconds != expectedMinReadySeconds {
|
||||
t.Fatalf("Expected replicaset.spec.minReadySeconds to be %v but got: %v", expectedMinReadySeconds, minReadySeconds)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAdmitFailInternal(t *testing.T) {
|
||||
serve := func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(200)
|
||||
}
|
||||
|
||||
controller, err := newControllerWithTestServer(serve, false)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error while creating test admission controller/server: %v", err)
|
||||
}
|
||||
|
||||
mockClient := &fake.Clientset{}
|
||||
mockClient.AddReactor("list", "configmaps", func(action core.Action) (bool, runtime.Object, error) {
|
||||
return true, nil, fmt.Errorf("unknown error")
|
||||
})
|
||||
|
||||
controller.SetInternalKubeClientSet(mockClient)
|
||||
|
||||
attrs := makeAdmissionRecord(makeReplicaSet())
|
||||
err = controller.Admit(attrs)
|
||||
|
||||
if err == nil {
|
||||
t.Fatalf("Expected admission controller to fail closed")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAdmitPolicyDoesNotExist(t *testing.T) {
|
||||
|
||||
controller, err := newControllerWithTestServer(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(404)
|
||||
}, false)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error while creating test admission controller/server: %v", err)
|
||||
}
|
||||
|
||||
attrs := makeAdmissionRecord(makeReplicaSet())
|
||||
err = controller.Admit(attrs)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("Expected admission controller to fail open but got error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAdmitFailClosed(t *testing.T) {
|
||||
tests := []struct {
|
||||
note string
|
||||
statusCode int
|
||||
body string
|
||||
}{
|
||||
{"server error", 500, ""},
|
||||
{"unmarshal error", 200, "{"},
|
||||
{"undefined result", 404, ``},
|
||||
{"policy errors", 200, `{"errors": ["conflicting replica-set-preferences"]}`},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
|
||||
serve := func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(tc.statusCode)
|
||||
if len(tc.body) > 0 {
|
||||
w.Write([]byte(tc.body))
|
||||
}
|
||||
}
|
||||
|
||||
controller, err := newControllerWithTestServer(serve, true)
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("%v: Unexpected error while creating test admission controller/server: %v", tc.note, err)
|
||||
continue
|
||||
}
|
||||
|
||||
obj := makeReplicaSet()
|
||||
attrs := admission.NewAttributesRecord(obj, nil, obj.GroupVersionKind(), obj.Namespace, obj.Name, api.Resource("replicasets").WithVersion("version"), "", admission.Create, &user.DefaultInfo{})
|
||||
err = controller.Admit(attrs)
|
||||
if err == nil {
|
||||
t.Errorf("%v: Expected admission controller to fail closed", tc.note)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func TestAdmitRetries(t *testing.T) {
|
||||
var numQueries int
|
||||
|
||||
controller, err := newControllerWithTestServer(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(500)
|
||||
numQueries++
|
||||
}, true)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error while creating test admission controller/server: %v", err)
|
||||
}
|
||||
|
||||
err = controller.Admit(makeAdmissionRecord(makeReplicaSet()))
|
||||
|
||||
if err == nil {
|
||||
t.Fatalf("Expected admission controller to fail closed")
|
||||
}
|
||||
|
||||
if numQueries <= 1 {
|
||||
t.Fatalf("Expected multiple queries/retries but got (numQueries): %v", numQueries)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAdmitSuccessWithAnnotationMerge(t *testing.T) {
|
||||
controller, err := newControllerWithTestServer(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Write([]byte(`
|
||||
{
|
||||
"annotations": {
|
||||
"foo": "bar-2"
|
||||
}
|
||||
}
|
||||
`))
|
||||
}, true)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error while creating test admission controller/server: %v", err)
|
||||
}
|
||||
|
||||
obj := makeReplicaSet()
|
||||
obj.Annotations = map[string]string{}
|
||||
obj.Annotations["foo"] = "bar"
|
||||
obj.Annotations["bar"] = "baz"
|
||||
|
||||
attrs := admission.NewAttributesRecord(obj, nil, obj.GroupVersionKind(), obj.Namespace, obj.Name, api.Resource("replicasets").WithVersion("version"), "", admission.Create, &user.DefaultInfo{})
|
||||
err = controller.Admit(attrs)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error from admission controller: %v", err)
|
||||
}
|
||||
|
||||
annotations := attrs.GetObject().(*extensionsv1.ReplicaSet).Annotations
|
||||
expected := map[string]string{
|
||||
"foo": "bar-2",
|
||||
"bar": "baz",
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(annotations, expected) {
|
||||
t.Fatalf("Expected annotations to be %v but got: %v", expected, annotations)
|
||||
}
|
||||
}
|
||||
|
||||
func newControllerWithTestServer(f func(w http.ResponseWriter, r *http.Request), policiesExist bool) (*admissionController, error) {
|
||||
server, err := newTestServer(f)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
kubeConfigFile, err := makeKubeConfigFile(server.URL, "/some/path/to/decision")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer os.Remove(kubeConfigFile)
|
||||
|
||||
configFile, err := makeAdmissionControlConfigFile(kubeConfigFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer os.Remove(configFile)
|
||||
|
||||
file, err := os.Open(configFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
controller, err := newAdmissionController(file)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
mockClient := &fake.Clientset{}
|
||||
|
||||
var items []api.ConfigMap
|
||||
|
||||
if policiesExist {
|
||||
items = append(items, api.ConfigMap{})
|
||||
}
|
||||
|
||||
mockClient.AddReactor("list", "configmaps", func(action core.Action) (bool, runtime.Object, error) {
|
||||
if action.GetNamespace() == policyConfigMapNamespace {
|
||||
return true, &api.ConfigMapList{Items: items}, nil
|
||||
}
|
||||
return true, nil, nil
|
||||
})
|
||||
|
||||
controller.SetInternalKubeClientSet(mockClient)
|
||||
|
||||
return controller, nil
|
||||
}
|
||||
|
||||
func newTestServer(f func(w http.ResponseWriter, r *http.Request)) (*httptest.Server, error) {
|
||||
server := httptest.NewUnstartedServer(http.HandlerFunc(f))
|
||||
server.Start()
|
||||
return server, nil
|
||||
}
|
||||
|
||||
func makeAdmissionControlConfigFile(kubeConfigFile string) (string, error) {
|
||||
tempfile, err := ioutil.TempFile("", "")
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
p := tempfile.Name()
|
||||
|
||||
configFileTmpl := `
|
||||
kubeconfig: {{ .KubeConfigFile }}
|
||||
retryBackoff: {{ .RetryBackoff }}
|
||||
`
|
||||
type configFileTemplateInput struct {
|
||||
KubeConfigFile string
|
||||
RetryBackoff int
|
||||
}
|
||||
|
||||
input := configFileTemplateInput{
|
||||
KubeConfigFile: kubeConfigFile,
|
||||
RetryBackoff: 1,
|
||||
}
|
||||
|
||||
tmpl, err := template.New("scheduling-policy-config").Parse(configFileTmpl)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if err := tmpl.Execute(tempfile, input); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return p, nil
|
||||
}
|
||||
|
||||
func makeKubeConfigFile(baseURL, path string) (string, error) {
|
||||
tempfile, err := ioutil.TempFile("", "")
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
p := tempfile.Name()
|
||||
|
||||
kubeConfigTmpl := `
|
||||
clusters:
|
||||
- name: test
|
||||
cluster:
|
||||
server: {{ .BaseURL }}{{ .Path }}
|
||||
users:
|
||||
- name: alice
|
||||
user:
|
||||
token: deadbeef
|
||||
contexts:
|
||||
- name: default
|
||||
context:
|
||||
cluster: test
|
||||
user: alice
|
||||
current-context: default`
|
||||
|
||||
type kubeConfigTemplateInput struct {
|
||||
BaseURL string
|
||||
Path string
|
||||
}
|
||||
|
||||
input := kubeConfigTemplateInput{
|
||||
BaseURL: baseURL,
|
||||
Path: path,
|
||||
}
|
||||
|
||||
tmpl, err := template.New("kubeconfig").Parse(kubeConfigTmpl)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if err := tmpl.Execute(tempfile, input); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return p, nil
|
||||
}
|
||||
|
||||
func makeAdmissionRecord(obj *extensionsv1.ReplicaSet) admission.Attributes {
|
||||
return admission.NewAttributesRecord(obj, nil, obj.GroupVersionKind(), obj.Namespace, obj.Name, api.Resource("replicasets").WithVersion("version"), "", admission.Create, &user.DefaultInfo{})
|
||||
}
|
||||
|
||||
func makeReplicaSet() *extensionsv1.ReplicaSet {
|
||||
return &extensionsv1.ReplicaSet{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: "ReplicaSet",
|
||||
APIVersion: "extensions/v1beta1",
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "myapp",
|
||||
},
|
||||
Spec: extensionsv1.ReplicaSetSpec{},
|
||||
}
|
||||
}
|
45
federation/plugin/pkg/admission/schedulingpolicy/merge.go
Normal file
45
federation/plugin/pkg/admission/schedulingpolicy/merge.go
Normal file
@ -0,0 +1,45 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package schedulingpolicy
|
||||
|
||||
import (
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
)
|
||||
|
||||
// mergeAnnotations updates obj so that the provided annotations supersede the
|
||||
// existing annotations.
|
||||
func mergeAnnotations(obj runtime.Object, annotations map[string]string) error {
|
||||
if len(annotations) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
accessor, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
orig := accessor.GetAnnotations()
|
||||
for k := range orig {
|
||||
if _, ok := annotations[k]; !ok {
|
||||
annotations[k] = orig[k]
|
||||
}
|
||||
}
|
||||
|
||||
accessor.SetAnnotations(annotations)
|
||||
return nil
|
||||
}
|
@ -0,0 +1,66 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package schedulingpolicy
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
func TestMergeAnnotations(t *testing.T) {
|
||||
tests := []struct {
|
||||
note string
|
||||
input *v1.Pod
|
||||
annotations string
|
||||
expected string
|
||||
}{
|
||||
{"nil annotations", &v1.Pod{}, `{"foo": "bar"}`, `{"foo": "bar"}`},
|
||||
{"empty annotations", &v1.Pod{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{}}}, `{"foo": "bar"}`, `{"foo": "bar"}`},
|
||||
{"existing annotation", &v1.Pod{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{"foo": "baz"}}}, `{"foo": "bar"}`, `{"foo": "bar"}`},
|
||||
{"different annotation", &v1.Pod{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{"baz": "qux"}}}, `{"foo": "bar"}`, `{"baz": "qux", "foo": "bar"}`},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
|
||||
annotations := map[string]string{}
|
||||
|
||||
if err := json.Unmarshal([]byte(tc.annotations), &annotations); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
expected := map[string]string{}
|
||||
|
||||
if err := json.Unmarshal([]byte(tc.expected), &expected); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
err := mergeAnnotations(tc.input, annotations)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(tc.input.ObjectMeta.Annotations, expected) {
|
||||
t.Errorf("%v: Expected annotations to equal %v but got: %v", tc.note, expected, tc.input.ObjectMeta.Annotations)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
145
federation/plugin/pkg/admission/schedulingpolicy/query.go
Normal file
145
federation/plugin/pkg/admission/schedulingpolicy/query.go
Normal file
@ -0,0 +1,145 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package schedulingpolicy
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apiserver/pkg/util/webhook"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
)
|
||||
|
||||
// policyUndefinedError represents an undefined response from the policy
|
||||
// engine. This typically means the relevant policy has not been loaded into
|
||||
// the engine.
|
||||
type policyUndefinedError struct{}
|
||||
|
||||
func (policyUndefinedError) Error() string {
|
||||
return "policy decision is undefined"
|
||||
}
|
||||
|
||||
// policyEngineQuery represents a single query against the policy engine.
|
||||
type policyEngineQuery struct {
|
||||
client *rest.RESTClient
|
||||
retryBackoff time.Duration
|
||||
obj runtime.Object
|
||||
gvk schema.GroupVersionKind
|
||||
}
|
||||
|
||||
// newPolicyEngineQuery returns a policyEngineQuery that can be executed.
|
||||
func newPolicyEngineQuery(client *rest.RESTClient, retryBackoff time.Duration, obj runtime.Object, gvk schema.GroupVersionKind) *policyEngineQuery {
|
||||
return &policyEngineQuery{
|
||||
client: client,
|
||||
retryBackoff: retryBackoff,
|
||||
obj: obj,
|
||||
gvk: gvk,
|
||||
}
|
||||
}
|
||||
|
||||
// Do returns the result of the policy engine query. If the policy decision is
|
||||
// undefined or an unknown error occurs, err is non-nil. Otherwise, result is
|
||||
// non-nil and contains the result of policy evaluation.
|
||||
func (query *policyEngineQuery) Do() (decision *policyDecision, err error) {
|
||||
|
||||
bs, err := query.encode()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var result rest.Result
|
||||
|
||||
err = webhook.WithExponentialBackoff(query.retryBackoff, func() error {
|
||||
result = query.client.Post().
|
||||
Body(bs).
|
||||
Do()
|
||||
return result.Error()
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
return nil, policyUndefinedError{}
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return decodeResult(result)
|
||||
}
|
||||
|
||||
// encode returns the encoded version of the query's runtime.Object.
|
||||
func (query *policyEngineQuery) encode() ([]byte, error) {
|
||||
|
||||
var info runtime.SerializerInfo
|
||||
infos := api.Codecs.SupportedMediaTypes()
|
||||
|
||||
for i := range infos {
|
||||
if infos[i].MediaType == "application/json" {
|
||||
info = infos[i]
|
||||
}
|
||||
}
|
||||
|
||||
if info.Serializer == nil {
|
||||
return nil, fmt.Errorf("serialization not supported")
|
||||
}
|
||||
|
||||
codec := api.Codecs.EncoderForVersion(info.Serializer, query.gvk.GroupVersion())
|
||||
|
||||
var buf bytes.Buffer
|
||||
if err := codec.Encode(query.obj, &buf); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return buf.Bytes(), nil
|
||||
}
|
||||
|
||||
// policyDecision represents a response from the policy engine.
|
||||
type policyDecision struct {
|
||||
Errors []string `json:"errors,omitempty"`
|
||||
Annotations map[string]string `json:"annotations,omitempty"`
|
||||
}
|
||||
|
||||
// Error returns an error if the policy raised an error.
|
||||
func (d *policyDecision) Error() error {
|
||||
if len(d.Errors) == 0 {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("reason(s): %v", strings.Join(d.Errors, "; "))
|
||||
}
|
||||
|
||||
func decodeResult(result rest.Result) (*policyDecision, error) {
|
||||
|
||||
bs, err := result.Raw()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
buf := bytes.NewBuffer(bs)
|
||||
var decision policyDecision
|
||||
|
||||
if err := json.NewDecoder(buf).Decode(&decision); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &decision, nil
|
||||
}
|
@ -51,6 +51,7 @@ federation/cmd/federation-controller-manager
|
||||
federation/cmd/genfeddocs
|
||||
federation/cmd/kubefed
|
||||
federation/pkg/federation-controller/util/replicapreferences
|
||||
federation/plugin/pkg/admission/schedulingpolicy
|
||||
hack
|
||||
hack/boilerplate/test
|
||||
hack/cmd/teststale
|
||||
|
Loading…
Reference in New Issue
Block a user