diff --git a/pkg/apis/admission/types.go b/pkg/apis/admission/types.go index 411ac03598a..14051f0169c 100644 --- a/pkg/apis/admission/types.go +++ b/pkg/apis/admission/types.go @@ -138,6 +138,12 @@ type AdmissionResponse struct { // the admission webhook to add additional context to the audit log for this request. // +optional AuditAnnotations map[string]string + // warnings is a list of warning messages to return to the requesting API client. + // Warning messages describe a problem the client making the API request should correct or be aware of. + // Limit warnings to 120 characters if possible. + // Warnings over 256 characters and large numbers of warnings may be truncated. + // +optional + Warnings []string } // PatchType is the type of patch being used to represent the mutated object diff --git a/staging/src/k8s.io/api/admission/v1/types.go b/staging/src/k8s.io/api/admission/v1/types.go index a40cb0d52e2..556fd1ad54d 100644 --- a/staging/src/k8s.io/api/admission/v1/types.go +++ b/staging/src/k8s.io/api/admission/v1/types.go @@ -140,6 +140,13 @@ type AdmissionResponse struct { // the admission webhook to add additional context to the audit log for this request. // +optional AuditAnnotations map[string]string `json:"auditAnnotations,omitempty" protobuf:"bytes,6,opt,name=auditAnnotations"` + + // warnings is a list of warning messages to return to the requesting API client. + // Warning messages describe a problem the client making the API request should correct or be aware of. + // Limit warnings to 120 characters if possible. + // Warnings over 256 characters and large numbers of warnings may be truncated. + // +optional + Warnings []string `json:"warnings,omitempty" protobuf:"bytes,7,rep,name=warnings"` } // PatchType is the type of patch being used to represent the mutated object diff --git a/staging/src/k8s.io/api/admission/v1beta1/types.go b/staging/src/k8s.io/api/admission/v1beta1/types.go index 92017b345ff..00c619d9986 100644 --- a/staging/src/k8s.io/api/admission/v1beta1/types.go +++ b/staging/src/k8s.io/api/admission/v1beta1/types.go @@ -145,6 +145,13 @@ type AdmissionResponse struct { // the admission webhook to add additional context to the audit log for this request. // +optional AuditAnnotations map[string]string `json:"auditAnnotations,omitempty" protobuf:"bytes,6,opt,name=auditAnnotations"` + + // warnings is a list of warning messages to return to the requesting API client. + // Warning messages describe a problem the client making the API request should correct or be aware of. + // Limit warnings to 120 characters if possible. + // Warnings over 256 characters and large numbers of warnings may be truncated. + // +optional + Warnings []string `json:"warnings,omitempty" protobuf:"bytes,7,rep,name=warnings"` } // PatchType is the type of patch being used to represent the mutated object diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/mutating/dispatcher.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/mutating/dispatcher.go index c1a961340f0..1fbe9d718b7 100644 --- a/staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/mutating/dispatcher.go +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/mutating/dispatcher.go @@ -43,6 +43,7 @@ import ( webhookrequest "k8s.io/apiserver/pkg/admission/plugin/webhook/request" auditinternal "k8s.io/apiserver/pkg/apis/audit" webhookutil "k8s.io/apiserver/pkg/util/webhook" + "k8s.io/apiserver/pkg/warning" utiltrace "k8s.io/utils/trace" ) @@ -267,6 +268,9 @@ func (a *mutatingDispatcher) callAttrMutatingHook(ctx context.Context, h *admiss klog.Warningf("Failed to set admission audit annotation %s to %s for mutating webhook %s: %v", key, v, h.Name, err) } } + for _, w := range result.Warnings { + warning.AddWarning(ctx, "", w) + } if !result.Allowed { return false, &webhookutil.ErrWebhookRejection{Status: webhookerrors.ToStatusErr(h.Name, result.Result)} diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/request/admissionreview.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/request/admissionreview.go index 5ea28d446b5..c60d0fb9e75 100644 --- a/staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/request/admissionreview.go +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/request/admissionreview.go @@ -36,6 +36,7 @@ type AdmissionResponse struct { Patch []byte PatchType admissionv1.PatchType Result *metav1.Status + Warnings []string } // VerifyAdmissionResponse checks the validity of the provided admission review object, and returns the @@ -93,6 +94,7 @@ func VerifyAdmissionResponse(uid types.UID, mutating bool, review runtime.Object Patch: patch, PatchType: patchType, Result: r.Response.Result, + Warnings: r.Response.Warnings, }, nil case *admissionv1beta1.AdmissionReview: @@ -118,6 +120,7 @@ func VerifyAdmissionResponse(uid types.UID, mutating bool, review runtime.Object Patch: patch, PatchType: patchType, Result: r.Response.Result, + Warnings: r.Response.Warnings, }, nil default: diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/validating/dispatcher.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/validating/dispatcher.go index 50454b99934..f065cdf5014 100644 --- a/staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/validating/dispatcher.go +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/validating/dispatcher.go @@ -33,6 +33,7 @@ import ( "k8s.io/apiserver/pkg/admission/plugin/webhook/generic" webhookrequest "k8s.io/apiserver/pkg/admission/plugin/webhook/request" webhookutil "k8s.io/apiserver/pkg/util/webhook" + "k8s.io/apiserver/pkg/warning" "k8s.io/klog/v2" utiltrace "k8s.io/utils/trace" ) @@ -227,6 +228,9 @@ func (d *validatingDispatcher) callHook(ctx context.Context, h *v1.ValidatingWeb klog.Warningf("Failed to set admission audit annotation %s to %s for validating webhook %s: %v", key, v, h.Name, err) } } + for _, w := range result.Warnings { + warning.AddWarning(ctx, "", w) + } if result.Allowed { return nil } diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/warning.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/warning.go index 323f634dcbf..55e85f0b73b 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/warning.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/warning.go @@ -17,10 +17,13 @@ limitations under the License. package filters import ( + "fmt" "net/http" "sync" + "unicode/utf8" "k8s.io/apimachinery/pkg/util/net" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apiserver/pkg/warning" ) @@ -33,10 +36,34 @@ func WithWarningRecorder(handler http.Handler) http.Handler { }) } +var ( + truncateAtTotalRunes = 4 * 1024 + truncateItemRunes = 256 +) + +type recordedWarning struct { + agent string + text string +} + type recorder struct { - lock sync.Mutex + // lock guards calls to AddWarning from multiple threads + lock sync.Mutex + + // recorded tracks whether AddWarning was already called with a given text recorded map[string]bool - writer http.ResponseWriter + + // ordered tracks warnings added so they can be replayed and truncated if needed + ordered []recordedWarning + + // written tracks how many runes of text have been added as warning headers + written int + + // truncating tracks if we have already exceeded truncateAtTotalRunes and are now truncating warning messages as we add them + truncating bool + + // writer is the response writer to add warning headers to + writer http.ResponseWriter } func (r *recorder) AddWarning(agent, text string) { @@ -47,6 +74,11 @@ func (r *recorder) AddWarning(agent, text string) { r.lock.Lock() defer r.lock.Unlock() + // if we've already exceeded our limit and are already truncating, return early + if r.written >= truncateAtTotalRunes && r.truncating { + return + } + // init if needed if r.recorded == nil { r.recorded = map[string]bool{} @@ -57,13 +89,45 @@ func (r *recorder) AddWarning(agent, text string) { return } r.recorded[text] = true + r.ordered = append(r.ordered, recordedWarning{agent: agent, text: text}) - // TODO(liggitt): track total message characters written: - // * if this takes us over 4k truncate individual messages to 256 chars and regenerate headers - // * if we're already truncating truncate this message to 256 chars - // * if we're still over 4k omit this message + // truncate on a rune boundary, if needed + textRuneLength := utf8.RuneCountInString(text) + if r.truncating && textRuneLength > truncateItemRunes { + text = string([]rune(text)[:truncateItemRunes]) + textRuneLength = truncateItemRunes + } - if header, err := net.NewWarningHeader(299, agent, text); err == nil { + // compute the header + header, err := net.NewWarningHeader(299, agent, text) + if err != nil { + return + } + + // if this fits within our limit, or we're already truncating, write and return + if r.written+textRuneLength <= truncateAtTotalRunes || r.truncating { + r.written += textRuneLength r.writer.Header().Add("Warning", header) + return + } + + // otherwise, enable truncation, reset, and replay the existing items as truncated warnings + r.truncating = true + r.written = 0 + r.writer.Header().Del("Warning") + utilruntime.HandleError(fmt.Errorf("exceeded max warning header size, truncating")) + for _, w := range r.ordered { + agent := w.agent + text := w.text + + textRuneLength := utf8.RuneCountInString(text) + if textRuneLength > truncateItemRunes { + text = string([]rune(text)[:truncateItemRunes]) + textRuneLength = truncateItemRunes + } + if header, err := net.NewWarningHeader(299, agent, text); err == nil { + r.written += textRuneLength + r.writer.Header().Add("Warning", header) + } } } diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/warning_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/warning_test.go index 2198506a913..24069283b1c 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/warning_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/warning_test.go @@ -98,3 +98,35 @@ func Test_recorder_AddWarning(t *testing.T) { }) } } + +func TestTruncation(t *testing.T) { + originalTotalRunes, originalItemRunes := truncateAtTotalRunes, truncateItemRunes + truncateAtTotalRunes, truncateItemRunes = 25, 5 + defer func() { + truncateAtTotalRunes, truncateItemRunes = originalTotalRunes, originalItemRunes + }() + + responseRecorder := httptest.NewRecorder() + warningRecorder := &recorder{writer: responseRecorder} + + // add items longer than the individual length + warningRecorder.AddWarning("", "aaaaaaaaaa") // long item + warningRecorder.AddWarning("-", "aaaaaaaaaa") // duplicate item + warningRecorder.AddWarning("", "bb") // short item + warningRecorder.AddWarning("", "ccc") // short item + warningRecorder.AddWarning("", "Iñtërnâtiô") // long item + // check they are preserved + if e, a := []string{`299 - "aaaaaaaaaa"`, `299 - "bb"`, `299 - "ccc"`, `299 - "Iñtërnâtiô"`}, responseRecorder.Header()["Warning"]; !reflect.DeepEqual(e, a) { + t.Errorf("expected\n%#v\ngot\n%#v", e, a) + } + // add an item that exceeds the length and triggers truncation, reducing existing items to 15 runes + warningRecorder.AddWarning("", "e") // triggering item, 16 total + warningRecorder.AddWarning("", "ffffffffff") // long item to get truncated, 21 total + warningRecorder.AddWarning("", "ffffffffff") // duplicate item + warningRecorder.AddWarning("", "gggggggggg") // item to get truncated, 26 total + warningRecorder.AddWarning("", "h") // item to get ignored since we're over our limit + // check that existing items are truncated, and order preserved + if e, a := []string{`299 - "aaaaa"`, `299 - "bb"`, `299 - "ccc"`, `299 - "Iñtër"`, `299 - "e"`, `299 - "fffff"`, `299 - "ggggg"`}, responseRecorder.Header()["Warning"]; !reflect.DeepEqual(e, a) { + t.Errorf("expected\n%#v\ngot\n%#v", e, a) + } +} diff --git a/test/images/agnhost/webhook/convert.go b/test/images/agnhost/webhook/convert.go index 96695f77b52..83b3f82c0bb 100644 --- a/test/images/agnhost/webhook/convert.go +++ b/test/images/agnhost/webhook/convert.go @@ -75,6 +75,7 @@ func convertAdmissionResponseToV1(r *v1beta1.AdmissionResponse) *v1.AdmissionRes Patch: r.Patch, PatchType: pt, Result: r.Result, + Warnings: r.Warnings, } } @@ -91,6 +92,7 @@ func convertAdmissionResponseToV1beta1(r *v1.AdmissionResponse) *v1beta1.Admissi Patch: r.Patch, PatchType: pt, Result: r.Result, + Warnings: r.Warnings, } } diff --git a/test/integration/apiserver/admissionwebhook/admission_test.go b/test/integration/apiserver/admissionwebhook/admission_test.go index 3f92e5f98eb..f73a9dc9984 100644 --- a/test/integration/apiserver/admissionwebhook/admission_test.go +++ b/test/integration/apiserver/admissionwebhook/admission_test.go @@ -169,6 +169,8 @@ type holder struct { t *testing.T + warningHandler *warningHandler + recordGVR metav1.GroupVersionResource recordOperation string recordNamespace string @@ -203,6 +205,7 @@ func (h *holder) reset(t *testing.T) { h.expectOldObject = false h.expectOptionsGVK = schema.GroupVersionKind{} h.expectOptions = false + h.warningHandler.reset() // Set up the recorded map with nil records for all combinations h.recorded = map[webhookOptions]*admissionRequest{} @@ -231,6 +234,7 @@ func (h *holder) expect(gvr schema.GroupVersionResource, gvk, optionsGVK schema. h.expectOldObject = oldObject h.expectOptionsGVK = optionsGVK h.expectOptions = options + h.warningHandler.reset() // Set up the recorded map with nil records for all combinations h.recorded = map[webhookOptions]*admissionRequest{} @@ -314,13 +318,15 @@ func (h *holder) verify(t *testing.T) { defer h.lock.Unlock() for options, value := range h.recorded { - if err := h.verifyRequest(options.converted, value); err != nil { + if err := h.verifyRequest(options, value); err != nil { t.Errorf("version: %v, phase:%v, converted:%v error: %v", options.version, options.phase, options.converted, err) } } } -func (h *holder) verifyRequest(converted bool, request *admissionRequest) error { +func (h *holder) verifyRequest(webhookOptions webhookOptions, request *admissionRequest) error { + converted := webhookOptions.converted + // Check if current resource should be exempted from Admission processing if admissionExemptResources[gvr(h.recordGVR.Group, h.recordGVR.Version, h.recordGVR.Resource)] { if request == nil { @@ -357,6 +363,10 @@ func (h *holder) verifyRequest(converted bool, request *admissionRequest) error return fmt.Errorf("unexpected options: %#v", request.Options.Object) } + if !h.warningHandler.hasWarning(makeWarning(webhookOptions.version, webhookOptions.phase, webhookOptions.converted)) { + return fmt.Errorf("no warning received from webhook") + } + return nil } @@ -384,6 +394,34 @@ func (h *holder) verifyOptions(options runtime.Object) error { return nil } +type warningHandler struct { + lock sync.Mutex + warnings map[string]bool +} + +func (w *warningHandler) reset() { + w.lock.Lock() + defer w.lock.Unlock() + w.warnings = map[string]bool{} +} +func (w *warningHandler) hasWarning(warning string) bool { + w.lock.Lock() + defer w.lock.Unlock() + return w.warnings[warning] +} +func makeWarning(version string, phase string, converted bool) string { + return fmt.Sprintf("%v/%v/%v", version, phase, converted) +} + +func (w *warningHandler) HandleWarningHeader(code int, agent string, message string) { + if code != 299 || len(message) == 0 { + return + } + w.lock.Lock() + defer w.lock.Unlock() + w.warnings[message] = true +} + // TestWebhookAdmissionWithWatchCache tests communication between API server and webhook process. func TestWebhookAdmissionWithWatchCache(t *testing.T) { testWebhookAdmission(t, true) @@ -399,6 +437,7 @@ func testWebhookAdmission(t *testing.T, watchCache bool) { // holder communicates expectations to webhooks, and results from webhooks holder := &holder{ t: t, + warningHandler: &warningHandler{warnings: map[string]bool{}}, gvrToConvertedGVR: map[metav1.GroupVersionResource]metav1.GroupVersionResource{}, gvrToConvertedGVK: map[metav1.GroupVersionResource]schema.GroupVersionKind{}, } @@ -451,6 +490,7 @@ func testWebhookAdmission(t *testing.T, watchCache bool) { clientConfig := rest.CopyConfig(server.ClientConfig) clientConfig.Impersonate.UserName = testClientUsername clientConfig.Impersonate.Groups = []string{"system:masters", "system:authenticated"} + clientConfig.WarningHandler = holder.warningHandler client, err := clientset.NewForConfig(clientConfig) if err != nil { t.Fatalf("unexpected error: %v", err) @@ -1265,6 +1305,9 @@ func newV1beta1WebhookHandler(t *testing.T, holder *holder, phase string, conver review.Kind = "" review.Response.UID = "" + // test plumbing warnings back to the client + review.Response.Warnings = []string{makeWarning("v1beta1", phase, converted)} + // If we're mutating, and have an object, return a patch to exercise conversion if phase == mutation && len(review.Request.Object.Raw) > 0 { review.Response.Patch = []byte(`[{"op":"add","path":"/foo","value":"test"}]`) @@ -1355,6 +1398,9 @@ func newV1WebhookHandler(t *testing.T, holder *holder, phase string, converted b Allowed: true, UID: review.Request.UID, Result: &metav1.Status{Message: "admitted"}, + + // test plumbing warnings back + Warnings: []string{makeWarning("v1", phase, converted)}, } // If we're mutating, and have an object, return a patch to exercise conversion if phase == mutation && len(review.Request.Object.Raw) > 0 {