Merge pull request #135815 from mborsz/automated-cherry-pick-of-#135367-mborsz-release-1.35

Automated cherry pick of #135367: Fix apiserver_watch_events_sizes metric.
This commit is contained in:
Kubernetes Prow Robot
2026-01-16 02:05:12 +05:30
committed by GitHub
3 changed files with 180 additions and 4 deletions

View File

@@ -36,7 +36,6 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/metrics"
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/util/apihelpers"
@@ -193,7 +192,6 @@ func (e *watchEncoder) doEncode(obj runtime.Object, event watch.Event, w io.Writ
Type: string(event.Type),
Object: runtime.RawExtension{Raw: e.buffer.Bytes()},
}
metrics.WatchEventsSizes.WithContext(e.ctx).WithLabelValues(e.groupVersionResource.Group, e.groupVersionResource.Version, e.groupVersionResource.Resource).Observe(float64(len(outEvent.Object.Raw)))
defer e.eventBuffer.Reset()
if err := e.encoder.Encode(outEvent, e.eventBuffer); err != nil {

View File

@@ -21,6 +21,7 @@ import (
"fmt"
"io"
"net/http"
"sync/atomic"
"time"
"golang.org/x/net/websocket"
@@ -37,6 +38,7 @@ import (
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
compbasemetrics "k8s.io/component-base/metrics"
)
// timeoutFactory abstracts watch timeout logic for testing
@@ -202,6 +204,29 @@ type WatchServer struct {
metricsScope string
}
// watchEventMetricsRecorder allows the caller to count bytes written and report the size of the event.
// It is thread-safe, as long as underlying io.Writer is thread-safe.
// Once all Write calls for a given watch event have finished, RecordEvent must be called.
type watchEventMetricsRecorder struct {
writer io.Writer
countMetric compbasemetrics.CounterMetric
sizeMetric compbasemetrics.ObserverMetric
byteCount atomic.Int64
}
// Write implements io.Writer.
func (c *watchEventMetricsRecorder) Write(p []byte) (n int, err error) {
n, err = c.writer.Write(p)
c.byteCount.Add(int64(n))
return
}
// Record reports the metrics and resets the byte count.
func (c *watchEventMetricsRecorder) RecordEvent() {
c.countMetric.Inc()
c.sizeMetric.Observe(float64(c.byteCount.Swap(0)))
}
// HandleHTTP serves a series of encoded events via HTTP with Transfer-Encoding: chunked.
// or over a websocket connection.
func (s *WatchServer) HandleHTTP(w http.ResponseWriter, req *http.Request) {
@@ -239,7 +264,14 @@ func (s *WatchServer) HandleHTTP(w http.ResponseWriter, req *http.Request) {
flusher.Flush()
gvr := s.Scope.Resource
watchEncoder := newWatchEncoder(req.Context(), gvr, s.EmbeddedEncoder, s.Encoder, framer)
recorder := &watchEventMetricsRecorder{
writer: framer,
countMetric: metrics.WatchEvents.WithContext(req.Context()).WithLabelValues(gvr.Group, gvr.Version, gvr.Resource),
sizeMetric: metrics.WatchEventsSizes.WithContext(req.Context()).WithLabelValues(gvr.Group, gvr.Version, gvr.Resource),
}
watchEncoder := newWatchEncoder(req.Context(), gvr, s.EmbeddedEncoder, s.Encoder, recorder)
ch := s.Watching.ResultChan()
done := req.Context().Done()
@@ -263,7 +295,6 @@ func (s *WatchServer) HandleHTTP(w http.ResponseWriter, req *http.Request) {
// End of results.
return
}
metrics.WatchEvents.WithContext(req.Context()).WithLabelValues(gvr.Group, gvr.Version, gvr.Resource).Inc()
isWatchListLatencyRecordingRequired := shouldRecordWatchListLatency(event)
if err := watchEncoder.Encode(event); err != nil {
@@ -271,6 +302,7 @@ func (s *WatchServer) HandleHTTP(w http.ResponseWriter, req *http.Request) {
// client disconnect.
return
}
recorder.RecordEvent()
if len(ch) == 0 {
flusher.Flush()

View File

@@ -17,6 +17,7 @@ limitations under the License.
package handlers
import (
"bytes"
"context"
"encoding/json"
"fmt"
@@ -24,6 +25,8 @@ import (
"net/http"
"net/http/httptest"
"net/url"
"strings"
"sync"
"testing"
"time"
@@ -36,9 +39,12 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/metrics"
endpointstesting "k8s.io/apiserver/pkg/endpoints/testing"
"k8s.io/client-go/dynamic"
restclient "k8s.io/client-go/rest"
"k8s.io/component-base/metrics/legacyregistry"
metricstestutil "k8s.io/component-base/metrics/testutil"
)
// Fake API versions, similar to api/latest.go
@@ -341,3 +347,143 @@ func serveWatch(watcher watch.Interface, watchServer *WatchServer, preServeErr e
watchServer.HandleHTTP(w, req)
}
}
type fakeCachingObject struct {
obj runtime.Object
once sync.Once
raw []byte
err error
}
func (f *fakeCachingObject) CacheEncode(_ runtime.Identifier, encode func(runtime.Object, io.Writer) error, w io.Writer) error {
f.once.Do(func() {
buffer := bytes.NewBuffer(nil)
f.err = encode(f.obj, buffer)
f.raw = buffer.Bytes()
})
if f.err != nil {
return f.err
}
_, err := w.Write(f.raw)
return err
}
func (f *fakeCachingObject) GetObject() runtime.Object {
return f.obj
}
func (f *fakeCachingObject) GetObjectKind() schema.ObjectKind {
return f.obj.GetObjectKind()
}
func (f *fakeCachingObject) DeepCopyObject() runtime.Object {
return &fakeCachingObject{obj: f.obj.DeepCopyObject()}
}
var _ runtime.CacheableObject = &fakeCachingObject{}
var _ runtime.Object = &fakeCachingObject{}
func TestWatchEventSizes(t *testing.T) {
metrics.Register()
gvr := schema.GroupVersionResource{Group: "group", Version: "version", Resource: "resource"}
testCases := []struct {
name string
event watch.Event
wantSize int64
}{
{
name: "regular object",
event: watch.Event{
Type: watch.Added,
Object: &endpointstesting.Simple{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
},
wantSize: 2 * 98,
},
{
name: "cached object",
event: watch.Event{
Type: watch.Added,
Object: &fakeCachingObject{obj: &runtime.Unknown{
Raw: []byte(`{"kind":"Simple","apiVersion":"v1","metadata":{"name":"foo"}}`),
ContentType: runtime.ContentTypeJSON,
}},
},
wantSize: 2 * 88,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctx := t.Context()
metrics.WatchEventsSizes.Reset()
metrics.WatchEvents.Reset()
watcher := watch.NewFake()
timeoutCh := make(chan time.Time)
doneCh := make(chan struct{})
info, ok := runtime.SerializerInfoForMediaType(codecs.SupportedMediaTypes(), runtime.ContentTypeJSON)
require.True(t, ok)
require.NotNil(t, info.StreamSerializer)
serializer := info.StreamSerializer
watchServer := &WatchServer{
Scope: &RequestScope{
Resource: gvr,
},
Watching: watcher,
MediaType: "application/json",
Framer: serializer.Framer,
Encoder: testCodecV2,
EmbeddedEncoder: testCodecV2,
TimeoutFactory: &fakeTimeoutFactory{timeoutCh: timeoutCh, done: doneCh},
}
s := httptest.NewServer(serveWatch(watcher, watchServer, nil))
defer s.Close()
// Setup a client
dest, _ := url.Parse(s.URL)
dest.Path = "/" + namedGroupPrefix + "/" + testGroupV2.Group + "/" + testGroupV2.Version + "/simple"
dest.RawQuery = "watch=true"
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, dest.String(), nil)
client := http.Client{}
resp, err := client.Do(req)
require.NoError(t, err)
defer apitesting.Close(t, resp.Body)
// Send object twice so that in case of caching the cached version is used.
watcher.Action(tc.event.Type, tc.event.Object)
watcher.Action(tc.event.Type, tc.event.Object)
close(timeoutCh)
<-doneCh
expected := fmt.Sprintf(`# HELP apiserver_watch_events_sizes [ALPHA] Watch event size distribution in bytes
# TYPE apiserver_watch_events_sizes histogram
apiserver_watch_events_sizes_bucket{group="group",resource="resource",version="version",le="1024"} 2
apiserver_watch_events_sizes_bucket{group="group",resource="resource",version="version",le="2048"} 2
apiserver_watch_events_sizes_bucket{group="group",resource="resource",version="version",le="4096"} 2
apiserver_watch_events_sizes_bucket{group="group",resource="resource",version="version",le="8192"} 2
apiserver_watch_events_sizes_bucket{group="group",resource="resource",version="version",le="16384"} 2
apiserver_watch_events_sizes_bucket{group="group",resource="resource",version="version",le="32768"} 2
apiserver_watch_events_sizes_bucket{group="group",resource="resource",version="version",le="65536"} 2
apiserver_watch_events_sizes_bucket{group="group",resource="resource",version="version",le="131072"} 2
apiserver_watch_events_sizes_bucket{group="group",resource="resource",version="version",le="+Inf"} 2
apiserver_watch_events_sizes_sum{group="group",resource="resource",version="version"} %d
apiserver_watch_events_sizes_count{group="group",resource="resource",version="version"} 2
# HELP apiserver_watch_events_total [ALPHA] Number of events sent in watch clients
# TYPE apiserver_watch_events_total counter
apiserver_watch_events_total{group="group",resource="resource",version="version"} 2
`, tc.wantSize)
err = metricstestutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), "apiserver_watch_events_sizes", "apiserver_watch_events_total")
require.NoError(t, err)
})
}
}