Add warnings capability for admission webhooks

This commit is contained in:
Jordan Liggitt 2020-06-30 16:27:56 -04:00
parent c53ce48d48
commit 5eef60a00a
10 changed files with 184 additions and 9 deletions

View File

@ -138,6 +138,12 @@ type AdmissionResponse struct {
// the admission webhook to add additional context to the audit log for this request.
// +optional
AuditAnnotations map[string]string
// warnings is a list of warning messages to return to the requesting API client.
// Warning messages describe a problem the client making the API request should correct or be aware of.
// Limit warnings to 120 characters if possible.
// Warnings over 256 characters and large numbers of warnings may be truncated.
// +optional
Warnings []string
}
// PatchType is the type of patch being used to represent the mutated object

View File

@ -140,6 +140,13 @@ type AdmissionResponse struct {
// the admission webhook to add additional context to the audit log for this request.
// +optional
AuditAnnotations map[string]string `json:"auditAnnotations,omitempty" protobuf:"bytes,6,opt,name=auditAnnotations"`
// warnings is a list of warning messages to return to the requesting API client.
// Warning messages describe a problem the client making the API request should correct or be aware of.
// Limit warnings to 120 characters if possible.
// Warnings over 256 characters and large numbers of warnings may be truncated.
// +optional
Warnings []string `json:"warnings,omitempty" protobuf:"bytes,7,rep,name=warnings"`
}
// PatchType is the type of patch being used to represent the mutated object

View File

@ -145,6 +145,13 @@ type AdmissionResponse struct {
// the admission webhook to add additional context to the audit log for this request.
// +optional
AuditAnnotations map[string]string `json:"auditAnnotations,omitempty" protobuf:"bytes,6,opt,name=auditAnnotations"`
// warnings is a list of warning messages to return to the requesting API client.
// Warning messages describe a problem the client making the API request should correct or be aware of.
// Limit warnings to 120 characters if possible.
// Warnings over 256 characters and large numbers of warnings may be truncated.
// +optional
Warnings []string `json:"warnings,omitempty" protobuf:"bytes,7,rep,name=warnings"`
}
// PatchType is the type of patch being used to represent the mutated object

View File

@ -43,6 +43,7 @@ import (
webhookrequest "k8s.io/apiserver/pkg/admission/plugin/webhook/request"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
webhookutil "k8s.io/apiserver/pkg/util/webhook"
"k8s.io/apiserver/pkg/warning"
utiltrace "k8s.io/utils/trace"
)
@ -267,6 +268,9 @@ func (a *mutatingDispatcher) callAttrMutatingHook(ctx context.Context, h *admiss
klog.Warningf("Failed to set admission audit annotation %s to %s for mutating webhook %s: %v", key, v, h.Name, err)
}
}
for _, w := range result.Warnings {
warning.AddWarning(ctx, "", w)
}
if !result.Allowed {
return false, &webhookutil.ErrWebhookRejection{Status: webhookerrors.ToStatusErr(h.Name, result.Result)}

View File

@ -36,6 +36,7 @@ type AdmissionResponse struct {
Patch []byte
PatchType admissionv1.PatchType
Result *metav1.Status
Warnings []string
}
// VerifyAdmissionResponse checks the validity of the provided admission review object, and returns the
@ -93,6 +94,7 @@ func VerifyAdmissionResponse(uid types.UID, mutating bool, review runtime.Object
Patch: patch,
PatchType: patchType,
Result: r.Response.Result,
Warnings: r.Response.Warnings,
}, nil
case *admissionv1beta1.AdmissionReview:
@ -118,6 +120,7 @@ func VerifyAdmissionResponse(uid types.UID, mutating bool, review runtime.Object
Patch: patch,
PatchType: patchType,
Result: r.Response.Result,
Warnings: r.Response.Warnings,
}, nil
default:

View File

@ -33,6 +33,7 @@ import (
"k8s.io/apiserver/pkg/admission/plugin/webhook/generic"
webhookrequest "k8s.io/apiserver/pkg/admission/plugin/webhook/request"
webhookutil "k8s.io/apiserver/pkg/util/webhook"
"k8s.io/apiserver/pkg/warning"
"k8s.io/klog/v2"
utiltrace "k8s.io/utils/trace"
)
@ -227,6 +228,9 @@ func (d *validatingDispatcher) callHook(ctx context.Context, h *v1.ValidatingWeb
klog.Warningf("Failed to set admission audit annotation %s to %s for validating webhook %s: %v", key, v, h.Name, err)
}
}
for _, w := range result.Warnings {
warning.AddWarning(ctx, "", w)
}
if result.Allowed {
return nil
}

View File

@ -17,10 +17,13 @@ limitations under the License.
package filters
import (
"fmt"
"net/http"
"sync"
"unicode/utf8"
"k8s.io/apimachinery/pkg/util/net"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/warning"
)
@ -33,10 +36,34 @@ func WithWarningRecorder(handler http.Handler) http.Handler {
})
}
var (
truncateAtTotalRunes = 4 * 1024
truncateItemRunes = 256
)
type recordedWarning struct {
agent string
text string
}
type recorder struct {
lock sync.Mutex
// lock guards calls to AddWarning from multiple threads
lock sync.Mutex
// recorded tracks whether AddWarning was already called with a given text
recorded map[string]bool
writer http.ResponseWriter
// ordered tracks warnings added so they can be replayed and truncated if needed
ordered []recordedWarning
// written tracks how many runes of text have been added as warning headers
written int
// truncating tracks if we have already exceeded truncateAtTotalRunes and are now truncating warning messages as we add them
truncating bool
// writer is the response writer to add warning headers to
writer http.ResponseWriter
}
func (r *recorder) AddWarning(agent, text string) {
@ -47,6 +74,11 @@ func (r *recorder) AddWarning(agent, text string) {
r.lock.Lock()
defer r.lock.Unlock()
// if we've already exceeded our limit and are already truncating, return early
if r.written >= truncateAtTotalRunes && r.truncating {
return
}
// init if needed
if r.recorded == nil {
r.recorded = map[string]bool{}
@ -57,13 +89,45 @@ func (r *recorder) AddWarning(agent, text string) {
return
}
r.recorded[text] = true
r.ordered = append(r.ordered, recordedWarning{agent: agent, text: text})
// TODO(liggitt): track total message characters written:
// * if this takes us over 4k truncate individual messages to 256 chars and regenerate headers
// * if we're already truncating truncate this message to 256 chars
// * if we're still over 4k omit this message
// truncate on a rune boundary, if needed
textRuneLength := utf8.RuneCountInString(text)
if r.truncating && textRuneLength > truncateItemRunes {
text = string([]rune(text)[:truncateItemRunes])
textRuneLength = truncateItemRunes
}
if header, err := net.NewWarningHeader(299, agent, text); err == nil {
// compute the header
header, err := net.NewWarningHeader(299, agent, text)
if err != nil {
return
}
// if this fits within our limit, or we're already truncating, write and return
if r.written+textRuneLength <= truncateAtTotalRunes || r.truncating {
r.written += textRuneLength
r.writer.Header().Add("Warning", header)
return
}
// otherwise, enable truncation, reset, and replay the existing items as truncated warnings
r.truncating = true
r.written = 0
r.writer.Header().Del("Warning")
utilruntime.HandleError(fmt.Errorf("exceeded max warning header size, truncating"))
for _, w := range r.ordered {
agent := w.agent
text := w.text
textRuneLength := utf8.RuneCountInString(text)
if textRuneLength > truncateItemRunes {
text = string([]rune(text)[:truncateItemRunes])
textRuneLength = truncateItemRunes
}
if header, err := net.NewWarningHeader(299, agent, text); err == nil {
r.written += textRuneLength
r.writer.Header().Add("Warning", header)
}
}
}

View File

@ -98,3 +98,35 @@ func Test_recorder_AddWarning(t *testing.T) {
})
}
}
func TestTruncation(t *testing.T) {
originalTotalRunes, originalItemRunes := truncateAtTotalRunes, truncateItemRunes
truncateAtTotalRunes, truncateItemRunes = 25, 5
defer func() {
truncateAtTotalRunes, truncateItemRunes = originalTotalRunes, originalItemRunes
}()
responseRecorder := httptest.NewRecorder()
warningRecorder := &recorder{writer: responseRecorder}
// add items longer than the individual length
warningRecorder.AddWarning("", "aaaaaaaaaa") // long item
warningRecorder.AddWarning("-", "aaaaaaaaaa") // duplicate item
warningRecorder.AddWarning("", "bb") // short item
warningRecorder.AddWarning("", "ccc") // short item
warningRecorder.AddWarning("", "Iñtërnâtiô") // long item
// check they are preserved
if e, a := []string{`299 - "aaaaaaaaaa"`, `299 - "bb"`, `299 - "ccc"`, `299 - "Iñtërnâtiô"`}, responseRecorder.Header()["Warning"]; !reflect.DeepEqual(e, a) {
t.Errorf("expected\n%#v\ngot\n%#v", e, a)
}
// add an item that exceeds the length and triggers truncation, reducing existing items to 15 runes
warningRecorder.AddWarning("", "e") // triggering item, 16 total
warningRecorder.AddWarning("", "ffffffffff") // long item to get truncated, 21 total
warningRecorder.AddWarning("", "ffffffffff") // duplicate item
warningRecorder.AddWarning("", "gggggggggg") // item to get truncated, 26 total
warningRecorder.AddWarning("", "h") // item to get ignored since we're over our limit
// check that existing items are truncated, and order preserved
if e, a := []string{`299 - "aaaaa"`, `299 - "bb"`, `299 - "ccc"`, `299 - "Iñtër"`, `299 - "e"`, `299 - "fffff"`, `299 - "ggggg"`}, responseRecorder.Header()["Warning"]; !reflect.DeepEqual(e, a) {
t.Errorf("expected\n%#v\ngot\n%#v", e, a)
}
}

View File

@ -75,6 +75,7 @@ func convertAdmissionResponseToV1(r *v1beta1.AdmissionResponse) *v1.AdmissionRes
Patch: r.Patch,
PatchType: pt,
Result: r.Result,
Warnings: r.Warnings,
}
}
@ -91,6 +92,7 @@ func convertAdmissionResponseToV1beta1(r *v1.AdmissionResponse) *v1beta1.Admissi
Patch: r.Patch,
PatchType: pt,
Result: r.Result,
Warnings: r.Warnings,
}
}

View File

@ -169,6 +169,8 @@ type holder struct {
t *testing.T
warningHandler *warningHandler
recordGVR metav1.GroupVersionResource
recordOperation string
recordNamespace string
@ -203,6 +205,7 @@ func (h *holder) reset(t *testing.T) {
h.expectOldObject = false
h.expectOptionsGVK = schema.GroupVersionKind{}
h.expectOptions = false
h.warningHandler.reset()
// Set up the recorded map with nil records for all combinations
h.recorded = map[webhookOptions]*admissionRequest{}
@ -231,6 +234,7 @@ func (h *holder) expect(gvr schema.GroupVersionResource, gvk, optionsGVK schema.
h.expectOldObject = oldObject
h.expectOptionsGVK = optionsGVK
h.expectOptions = options
h.warningHandler.reset()
// Set up the recorded map with nil records for all combinations
h.recorded = map[webhookOptions]*admissionRequest{}
@ -314,13 +318,15 @@ func (h *holder) verify(t *testing.T) {
defer h.lock.Unlock()
for options, value := range h.recorded {
if err := h.verifyRequest(options.converted, value); err != nil {
if err := h.verifyRequest(options, value); err != nil {
t.Errorf("version: %v, phase:%v, converted:%v error: %v", options.version, options.phase, options.converted, err)
}
}
}
func (h *holder) verifyRequest(converted bool, request *admissionRequest) error {
func (h *holder) verifyRequest(webhookOptions webhookOptions, request *admissionRequest) error {
converted := webhookOptions.converted
// Check if current resource should be exempted from Admission processing
if admissionExemptResources[gvr(h.recordGVR.Group, h.recordGVR.Version, h.recordGVR.Resource)] {
if request == nil {
@ -357,6 +363,10 @@ func (h *holder) verifyRequest(converted bool, request *admissionRequest) error
return fmt.Errorf("unexpected options: %#v", request.Options.Object)
}
if !h.warningHandler.hasWarning(makeWarning(webhookOptions.version, webhookOptions.phase, webhookOptions.converted)) {
return fmt.Errorf("no warning received from webhook")
}
return nil
}
@ -384,6 +394,34 @@ func (h *holder) verifyOptions(options runtime.Object) error {
return nil
}
type warningHandler struct {
lock sync.Mutex
warnings map[string]bool
}
func (w *warningHandler) reset() {
w.lock.Lock()
defer w.lock.Unlock()
w.warnings = map[string]bool{}
}
func (w *warningHandler) hasWarning(warning string) bool {
w.lock.Lock()
defer w.lock.Unlock()
return w.warnings[warning]
}
func makeWarning(version string, phase string, converted bool) string {
return fmt.Sprintf("%v/%v/%v", version, phase, converted)
}
func (w *warningHandler) HandleWarningHeader(code int, agent string, message string) {
if code != 299 || len(message) == 0 {
return
}
w.lock.Lock()
defer w.lock.Unlock()
w.warnings[message] = true
}
// TestWebhookAdmissionWithWatchCache tests communication between API server and webhook process.
func TestWebhookAdmissionWithWatchCache(t *testing.T) {
testWebhookAdmission(t, true)
@ -399,6 +437,7 @@ func testWebhookAdmission(t *testing.T, watchCache bool) {
// holder communicates expectations to webhooks, and results from webhooks
holder := &holder{
t: t,
warningHandler: &warningHandler{warnings: map[string]bool{}},
gvrToConvertedGVR: map[metav1.GroupVersionResource]metav1.GroupVersionResource{},
gvrToConvertedGVK: map[metav1.GroupVersionResource]schema.GroupVersionKind{},
}
@ -451,6 +490,7 @@ func testWebhookAdmission(t *testing.T, watchCache bool) {
clientConfig := rest.CopyConfig(server.ClientConfig)
clientConfig.Impersonate.UserName = testClientUsername
clientConfig.Impersonate.Groups = []string{"system:masters", "system:authenticated"}
clientConfig.WarningHandler = holder.warningHandler
client, err := clientset.NewForConfig(clientConfig)
if err != nil {
t.Fatalf("unexpected error: %v", err)
@ -1265,6 +1305,9 @@ func newV1beta1WebhookHandler(t *testing.T, holder *holder, phase string, conver
review.Kind = ""
review.Response.UID = ""
// test plumbing warnings back to the client
review.Response.Warnings = []string{makeWarning("v1beta1", phase, converted)}
// If we're mutating, and have an object, return a patch to exercise conversion
if phase == mutation && len(review.Request.Object.Raw) > 0 {
review.Response.Patch = []byte(`[{"op":"add","path":"/foo","value":"test"}]`)
@ -1355,6 +1398,9 @@ func newV1WebhookHandler(t *testing.T, holder *holder, phase string, converted b
Allowed: true,
UID: review.Request.UID,
Result: &metav1.Status{Message: "admitted"},
// test plumbing warnings back
Warnings: []string{makeWarning("v1", phase, converted)},
}
// If we're mutating, and have an object, return a patch to exercise conversion
if phase == mutation && len(review.Request.Object.Raw) > 0 {