Merge pull request #28387 from caesarxuchao/gc-latency-measure

Automatic merge from submit-queue

[GarbageCollector] measure latency

First commit is #27600.

In e2e tests, I measure the average time an item spend in the eventQueue(~1.5 ms), dirtyQueue(~13ms), and orphanQueue(~37ms). There is no stress test in e2e yet, so the number may not be useful.

<!-- Reviewable:start -->
---
This change is [<img src="https://reviewable.kubernetes.io/review_button.svg" height="34" align="absmiddle" alt="Reviewable"/>](https://reviewable.kubernetes.io/reviews/kubernetes/kubernetes/28387)
<!-- Reviewable:end -->
This commit is contained in:
Kubernetes Submit Queue 2016-08-11 02:33:55 -07:00 committed by GitHub
commit 035ec518af
9 changed files with 293 additions and 23 deletions

View File

@ -33,6 +33,7 @@ import (
"k8s.io/kubernetes/pkg/conversion/queryparams" "k8s.io/kubernetes/pkg/conversion/queryparams"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer" "k8s.io/kubernetes/pkg/runtime/serializer"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
) )
@ -78,6 +79,11 @@ func NewClient(conf *restclient.Config) (*Client, error) {
return &Client{cl: cl}, nil return &Client{cl: cl}, nil
} }
// GetRateLimiter returns rate limier.
func (c *Client) GetRateLimiter() flowcontrol.RateLimiter {
return c.cl.GetRateLimiter()
}
// Resource returns an API interface to the specified resource for this client's // Resource returns an API interface to the specified resource for this client's
// group and version. If resource is not a namespaced resource, then namespace // group and version. If resource is not a namespaced resource, then namespace
// is ignored. // is ignored.

View File

@ -36,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly" "k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/clock"
utilerrors "k8s.io/kubernetes/pkg/util/errors" utilerrors "k8s.io/kubernetes/pkg/util/errors"
utilruntime "k8s.io/kubernetes/pkg/util/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
@ -127,7 +128,7 @@ func (m *concurrentUIDToNode) Delete(uid types.UID) {
} }
type Propagator struct { type Propagator struct {
eventQueue *workqueue.Type eventQueue *workqueue.TimedWorkQueue
// uidToNode doesn't require a lock to protect, because only the // uidToNode doesn't require a lock to protect, because only the
// single-threaded Propagator.processEvent() reads/writes it. // single-threaded Propagator.processEvent() reads/writes it.
uidToNode *concurrentUIDToNode uidToNode *concurrentUIDToNode
@ -310,7 +311,7 @@ func (gc *GarbageCollector) removeOrphanFinalizer(owner *node) error {
// the "Orphan" finalizer. The node is add back into the orphanQueue if any of // the "Orphan" finalizer. The node is add back into the orphanQueue if any of
// these steps fail. // these steps fail.
func (gc *GarbageCollector) orphanFinalizer() { func (gc *GarbageCollector) orphanFinalizer() {
key, quit := gc.orphanQueue.Get() key, start, quit := gc.orphanQueue.Get()
if quit { if quit {
return return
} }
@ -330,20 +331,21 @@ func (gc *GarbageCollector) orphanFinalizer() {
err := gc.orhpanDependents(owner.identity, dependents) err := gc.orhpanDependents(owner.identity, dependents)
if err != nil { if err != nil {
glog.V(6).Infof("orphanDependents for %s failed with %v", owner.identity, err) glog.V(6).Infof("orphanDependents for %s failed with %v", owner.identity, err)
gc.orphanQueue.Add(owner) gc.orphanQueue.AddWithTimestamp(owner, start)
return return
} }
// update the owner, remove "orphaningFinalizer" from its finalizers list // update the owner, remove "orphaningFinalizer" from its finalizers list
err = gc.removeOrphanFinalizer(owner) err = gc.removeOrphanFinalizer(owner)
if err != nil { if err != nil {
glog.V(6).Infof("removeOrphanFinalizer for %s failed with %v", owner.identity, err) glog.V(6).Infof("removeOrphanFinalizer for %s failed with %v", owner.identity, err)
gc.orphanQueue.Add(owner) gc.orphanQueue.AddWithTimestamp(owner, start)
} }
OrphanProcessingLatency.Observe(sinceInMicroseconds(gc.clock, start))
} }
// Dequeueing an event from eventQueue, updating graph, populating dirty_queue. // Dequeueing an event from eventQueue, updating graph, populating dirty_queue.
func (p *Propagator) processEvent() { func (p *Propagator) processEvent() {
key, quit := p.eventQueue.Get() key, start, quit := p.eventQueue.Get()
if quit { if quit {
return return
} }
@ -420,6 +422,7 @@ func (p *Propagator) processEvent() {
p.gc.dirtyQueue.Add(dep) p.gc.dirtyQueue.Add(dep)
} }
} }
EventProcessingLatency.Observe(sinceInMicroseconds(p.gc.clock, start))
} }
// GarbageCollector is responsible for carrying out cascading deletion, and // GarbageCollector is responsible for carrying out cascading deletion, and
@ -432,11 +435,14 @@ type GarbageCollector struct {
metaOnlyClientPool dynamic.ClientPool metaOnlyClientPool dynamic.ClientPool
// clientPool uses the regular dynamicCodec. We need it to update // clientPool uses the regular dynamicCodec. We need it to update
// finalizers. It can be removed if we support patching finalizers. // finalizers. It can be removed if we support patching finalizers.
clientPool dynamic.ClientPool clientPool dynamic.ClientPool
dirtyQueue *workqueue.Type dirtyQueue *workqueue.TimedWorkQueue
orphanQueue *workqueue.Type orphanQueue *workqueue.TimedWorkQueue
monitors []monitor monitors []monitor
propagator *Propagator propagator *Propagator
clock clock.Clock
registeredRateLimiter *RegisteredRateLimiter
registeredRateLimiterForMonitors *RegisteredRateLimiter
} }
func gcListWatcher(client *dynamic.Client, resource unversioned.GroupVersionResource) *cache.ListWatch { func gcListWatcher(client *dynamic.Client, resource unversioned.GroupVersionResource) *cache.ListWatch {
@ -464,14 +470,15 @@ func gcListWatcher(client *dynamic.Client, resource unversioned.GroupVersionReso
} }
} }
func monitorFor(p *Propagator, clientPool dynamic.ClientPool, resource unversioned.GroupVersionResource, kind unversioned.GroupVersionKind) (monitor, error) { func (gc *GarbageCollector) monitorFor(resource unversioned.GroupVersionResource, kind unversioned.GroupVersionKind) (monitor, error) {
// TODO: consider store in one storage. // TODO: consider store in one storage.
glog.V(6).Infof("create storage for resource %s", resource) glog.V(6).Infof("create storage for resource %s", resource)
var monitor monitor var monitor monitor
client, err := p.gc.metaOnlyClientPool.ClientForGroupVersion(resource.GroupVersion()) client, err := gc.metaOnlyClientPool.ClientForGroupVersion(resource.GroupVersion())
if err != nil { if err != nil {
return monitor, err return monitor, err
} }
gc.registeredRateLimiterForMonitors.registerIfNotPresent(resource.GroupVersion(), client, "garbage_collector_monitoring")
setObjectTypeMeta := func(obj interface{}) { setObjectTypeMeta := func(obj interface{}) {
runtimeObject, ok := obj.(runtime.Object) runtimeObject, ok := obj.(runtime.Object)
if !ok { if !ok {
@ -491,13 +498,13 @@ func monitorFor(p *Propagator, clientPool dynamic.ClientPool, resource unversion
eventType: addEvent, eventType: addEvent,
obj: obj, obj: obj,
} }
p.eventQueue.Add(event) gc.propagator.eventQueue.Add(event)
}, },
UpdateFunc: func(oldObj, newObj interface{}) { UpdateFunc: func(oldObj, newObj interface{}) {
setObjectTypeMeta(newObj) setObjectTypeMeta(newObj)
setObjectTypeMeta(oldObj) setObjectTypeMeta(oldObj)
event := event{updateEvent, newObj, oldObj} event := event{updateEvent, newObj, oldObj}
p.eventQueue.Add(event) gc.propagator.eventQueue.Add(event)
}, },
DeleteFunc: func(obj interface{}) { DeleteFunc: func(obj interface{}) {
// delta fifo may wrap the object in a cache.DeletedFinalStateUnknown, unwrap it // delta fifo may wrap the object in a cache.DeletedFinalStateUnknown, unwrap it
@ -509,7 +516,7 @@ func monitorFor(p *Propagator, clientPool dynamic.ClientPool, resource unversion
eventType: deleteEvent, eventType: deleteEvent,
obj: obj, obj: obj,
} }
p.eventQueue.Add(event) gc.propagator.eventQueue.Add(event)
}, },
}, },
) )
@ -526,16 +533,20 @@ var ignoredResources = map[unversioned.GroupVersionResource]struct{}{
} }
func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynamic.ClientPool, resources []unversioned.GroupVersionResource) (*GarbageCollector, error) { func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynamic.ClientPool, resources []unversioned.GroupVersionResource) (*GarbageCollector, error) {
clock := clock.RealClock{}
gc := &GarbageCollector{ gc := &GarbageCollector{
metaOnlyClientPool: metaOnlyClientPool, metaOnlyClientPool: metaOnlyClientPool,
clientPool: clientPool, clientPool: clientPool,
dirtyQueue: workqueue.New(),
orphanQueue: workqueue.New(),
// TODO: should use a dynamic RESTMapper built from the discovery results. // TODO: should use a dynamic RESTMapper built from the discovery results.
restMapper: registered.RESTMapper(), restMapper: registered.RESTMapper(),
clock: clock,
dirtyQueue: workqueue.NewTimedWorkQueue(clock),
orphanQueue: workqueue.NewTimedWorkQueue(clock),
registeredRateLimiter: NewRegisteredRateLimiter(),
registeredRateLimiterForMonitors: NewRegisteredRateLimiter(),
} }
gc.propagator = &Propagator{ gc.propagator = &Propagator{
eventQueue: workqueue.New(), eventQueue: workqueue.NewTimedWorkQueue(gc.clock),
uidToNode: &concurrentUIDToNode{ uidToNode: &concurrentUIDToNode{
RWMutex: &sync.RWMutex{}, RWMutex: &sync.RWMutex{},
uidToNode: make(map[types.UID]*node), uidToNode: make(map[types.UID]*node),
@ -551,7 +562,7 @@ func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynam
if err != nil { if err != nil {
return nil, err return nil, err
} }
monitor, err := monitorFor(gc.propagator, gc.clientPool, resource, kind) monitor, err := gc.monitorFor(resource, kind)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -561,7 +572,7 @@ func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynam
} }
func (gc *GarbageCollector) worker() { func (gc *GarbageCollector) worker() {
key, quit := gc.dirtyQueue.Get() key, start, quit := gc.dirtyQueue.Get()
if quit { if quit {
return return
} }
@ -570,6 +581,7 @@ func (gc *GarbageCollector) worker() {
if err != nil { if err != nil {
utilruntime.HandleError(fmt.Errorf("Error syncing item %#v: %v", key, err)) utilruntime.HandleError(fmt.Errorf("Error syncing item %#v: %v", key, err))
} }
DirtyProcessingLatency.Observe(sinceInMicroseconds(gc.clock, start))
} }
// apiResource consults the REST mapper to translate an <apiVersion, kind, // apiResource consults the REST mapper to translate an <apiVersion, kind,
@ -592,6 +604,7 @@ func (gc *GarbageCollector) apiResource(apiVersion, kind string, namespaced bool
func (gc *GarbageCollector) deleteObject(item objectReference) error { func (gc *GarbageCollector) deleteObject(item objectReference) error {
fqKind := unversioned.FromAPIVersionAndKind(item.APIVersion, item.Kind) fqKind := unversioned.FromAPIVersionAndKind(item.APIVersion, item.Kind)
client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion()) client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion())
gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation")
resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0) resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0)
if err != nil { if err != nil {
return err return err
@ -605,6 +618,7 @@ func (gc *GarbageCollector) deleteObject(item objectReference) error {
func (gc *GarbageCollector) getObject(item objectReference) (*runtime.Unstructured, error) { func (gc *GarbageCollector) getObject(item objectReference) (*runtime.Unstructured, error) {
fqKind := unversioned.FromAPIVersionAndKind(item.APIVersion, item.Kind) fqKind := unversioned.FromAPIVersionAndKind(item.APIVersion, item.Kind)
client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion()) client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion())
gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation")
resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0) resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0)
if err != nil { if err != nil {
return nil, err return nil, err
@ -615,6 +629,7 @@ func (gc *GarbageCollector) getObject(item objectReference) (*runtime.Unstructur
func (gc *GarbageCollector) updateObject(item objectReference, obj *runtime.Unstructured) (*runtime.Unstructured, error) { func (gc *GarbageCollector) updateObject(item objectReference, obj *runtime.Unstructured) (*runtime.Unstructured, error) {
fqKind := unversioned.FromAPIVersionAndKind(item.APIVersion, item.Kind) fqKind := unversioned.FromAPIVersionAndKind(item.APIVersion, item.Kind)
client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion()) client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion())
gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation")
resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0) resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0)
if err != nil { if err != nil {
return nil, err return nil, err
@ -625,6 +640,7 @@ func (gc *GarbageCollector) updateObject(item objectReference, obj *runtime.Unst
func (gc *GarbageCollector) patchObject(item objectReference, patch []byte) (*runtime.Unstructured, error) { func (gc *GarbageCollector) patchObject(item objectReference, patch []byte) (*runtime.Unstructured, error) {
fqKind := unversioned.FromAPIVersionAndKind(item.APIVersion, item.Kind) fqKind := unversioned.FromAPIVersionAndKind(item.APIVersion, item.Kind)
client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion()) client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion())
gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation")
resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0) resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0)
if err != nil { if err != nil {
return nil, err return nil, err
@ -750,6 +766,7 @@ func (gc *GarbageCollector) Run(workers int, stopCh <-chan struct{}) {
go wait.Until(gc.worker, 0, stopCh) go wait.Until(gc.worker, 0, stopCh)
go wait.Until(gc.orphanFinalizer, 0, stopCh) go wait.Until(gc.orphanFinalizer, 0, stopCh)
} }
Register()
<-stopCh <-stopCh
glog.Infof("Garbage Collector: Shutting down") glog.Infof("Garbage Collector: Shutting down")
gc.dirtyQueue.ShutDown() gc.dirtyQueue.ShutDown()

View File

@ -35,6 +35,7 @@ import (
"k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/typed/dynamic" "k8s.io/kubernetes/pkg/client/typed/dynamic"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/clock"
"k8s.io/kubernetes/pkg/util/json" "k8s.io/kubernetes/pkg/util/json"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/util/workqueue"
@ -282,13 +283,14 @@ func TestProcessEvent(t *testing.T) {
for _, scenario := range testScenarios { for _, scenario := range testScenarios {
propagator := &Propagator{ propagator := &Propagator{
eventQueue: workqueue.New(), eventQueue: workqueue.NewTimedWorkQueue(clock.RealClock{}),
uidToNode: &concurrentUIDToNode{ uidToNode: &concurrentUIDToNode{
RWMutex: &sync.RWMutex{}, RWMutex: &sync.RWMutex{},
uidToNode: make(map[types.UID]*node), uidToNode: make(map[types.UID]*node),
}, },
gc: &GarbageCollector{ gc: &GarbageCollector{
dirtyQueue: workqueue.New(), dirtyQueue: workqueue.NewTimedWorkQueue(clock.RealClock{}),
clock: clock.RealClock{},
}, },
} }
for i := 0; i < len(scenario.events); i++ { for i := 0; i < len(scenario.events); i++ {

View File

@ -0,0 +1,73 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package garbagecollector
import (
"sync"
"time"
"k8s.io/kubernetes/pkg/util/clock"
"github.com/prometheus/client_golang/prometheus"
)
const (
GarbageCollectSubsystem = "garbage_collector"
EventProcessingLatencyKey = "event_processing_latency_microseconds"
DirtyProcessingLatencyKey = "dirty_processing_latency_microseconds"
OrphanProcessingLatencyKey = "orphan_processing_latency_microseconds"
)
var (
EventProcessingLatency = prometheus.NewSummary(
prometheus.SummaryOpts{
Subsystem: GarbageCollectSubsystem,
Name: EventProcessingLatencyKey,
Help: "Time in microseconds of an event spend in the eventQueue",
},
)
DirtyProcessingLatency = prometheus.NewSummary(
prometheus.SummaryOpts{
Subsystem: GarbageCollectSubsystem,
Name: DirtyProcessingLatencyKey,
Help: "Time in microseconds of an item spend in the dirtyQueue",
},
)
OrphanProcessingLatency = prometheus.NewSummary(
prometheus.SummaryOpts{
Subsystem: GarbageCollectSubsystem,
Name: OrphanProcessingLatencyKey,
Help: "Time in microseconds of an item spend in the orphanQueue",
},
)
)
var registerMetrics sync.Once
// Register all metrics.
func Register() {
// Register the metrics.
registerMetrics.Do(func() {
prometheus.MustRegister(EventProcessingLatency)
prometheus.MustRegister(DirtyProcessingLatency)
prometheus.MustRegister(OrphanProcessingLatency)
})
}
func sinceInMicroseconds(clock clock.Clock, start time.Time) float64 {
return float64(clock.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds())
}

View File

@ -0,0 +1,59 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package garbagecollector
import (
"fmt"
"strings"
"sync"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/typed/dynamic"
"k8s.io/kubernetes/pkg/util/metrics"
)
// RegisteredRateLimiter records the registered RateLimters to avoid
// duplication.
type RegisteredRateLimiter struct {
rateLimiters map[unversioned.GroupVersion]struct{}
lock sync.RWMutex
}
// NewRegisteredRateLimiter returns a new RegisteredRateLimiater.
func NewRegisteredRateLimiter() *RegisteredRateLimiter {
return &RegisteredRateLimiter{
rateLimiters: make(map[unversioned.GroupVersion]struct{}),
}
}
func (r *RegisteredRateLimiter) registerIfNotPresent(gv unversioned.GroupVersion, client *dynamic.Client, prefix string) {
r.lock.RLock()
_, ok := r.rateLimiters[gv]
r.lock.RUnlock()
if ok {
return
}
r.lock.Lock()
defer r.lock.Unlock()
if _, ok := r.rateLimiters[gv]; !ok {
if rateLimiter := client.GetRateLimiter(); rateLimiter != nil {
group := strings.Replace(gv.Group, ".", ":", -1)
metrics.RegisterMetricAndTrackRateLimiterUsage(fmt.Sprintf("%s_%s_%s", prefix, group, gv.Version), rateLimiter)
}
r.rateLimiters[gv] = struct{}{}
}
}

View File

@ -0,0 +1,62 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workqueue
import (
"time"
"k8s.io/kubernetes/pkg/util/clock"
)
type TimedWorkQueue struct {
*Type
clock clock.Clock
}
type timedWorkQueueItem struct {
time time.Time
obj interface{}
}
func NewTimedWorkQueue(clock clock.Clock) *TimedWorkQueue {
return &TimedWorkQueue{New(), clock}
}
// Add adds the obj along with the current timestamp to the queue.
func (q TimedWorkQueue) Add(obj interface{}) {
start := q.clock.Now()
item := timedWorkQueueItem{start, obj}
q.Type.Add(item)
}
// AddWithTimestamp is useful if the caller does not want to refresh the start
// time when requeuing an item.
func (q TimedWorkQueue) AddWithTimestamp(obj interface{}, timestamp time.Time) {
item := timedWorkQueueItem{timestamp, obj}
q.Type.Add(item)
}
// Get gets the obj along with its timestamp from the queue.
func (q TimedWorkQueue) Get() (item interface{}, start time.Time, shutdown bool) {
item, shutdown = q.Type.Get()
if item != nil {
timed, _ := item.(timedWorkQueueItem)
item = timed.obj
start = timed.time
}
return item, start, shutdown
}

View File

@ -57,6 +57,10 @@ func (m *MetricsForE2E) filterMetrics() {
for _, metric := range InterestingApiServerMetrics { for _, metric := range InterestingApiServerMetrics {
interestingApiServerMetrics[metric] = (*m).ApiServerMetrics[metric] interestingApiServerMetrics[metric] = (*m).ApiServerMetrics[metric]
} }
interestingControllerManagerMetrics := make(metrics.ControllerManagerMetrics)
for _, metric := range InterestingControllerManagerMetrics {
interestingControllerManagerMetrics[metric] = (*m).ControllerManagerMetrics[metric]
}
interestingKubeletMetrics := make(map[string]metrics.KubeletMetrics) interestingKubeletMetrics := make(map[string]metrics.KubeletMetrics)
for kubelet, grabbed := range (*m).KubeletMetrics { for kubelet, grabbed := range (*m).KubeletMetrics {
interestingKubeletMetrics[kubelet] = make(metrics.KubeletMetrics) interestingKubeletMetrics[kubelet] = make(metrics.KubeletMetrics)
@ -65,6 +69,7 @@ func (m *MetricsForE2E) filterMetrics() {
} }
} }
(*m).ApiServerMetrics = interestingApiServerMetrics (*m).ApiServerMetrics = interestingApiServerMetrics
(*m).ControllerManagerMetrics = interestingControllerManagerMetrics
(*m).KubeletMetrics = interestingKubeletMetrics (*m).KubeletMetrics = interestingKubeletMetrics
} }
@ -76,6 +81,12 @@ func (m *MetricsForE2E) PrintHumanReadable() string {
buf.WriteString(fmt.Sprintf("\t%v\n", metrics.PrintSample(sample))) buf.WriteString(fmt.Sprintf("\t%v\n", metrics.PrintSample(sample)))
} }
} }
for _, interestingMetric := range InterestingControllerManagerMetrics {
buf.WriteString(fmt.Sprintf("For %v:\n", interestingMetric))
for _, sample := range (*m).ControllerManagerMetrics[interestingMetric] {
buf.WriteString(fmt.Sprintf("\t%v\n", metrics.PrintSample(sample)))
}
}
for kubelet, grabbed := range (*m).KubeletMetrics { for kubelet, grabbed := range (*m).KubeletMetrics {
buf.WriteString(fmt.Sprintf("For %v:\n", kubelet)) buf.WriteString(fmt.Sprintf("For %v:\n", kubelet))
for _, interestingMetric := range InterestingKubeletMetrics { for _, interestingMetric := range InterestingKubeletMetrics {
@ -104,6 +115,12 @@ var InterestingApiServerMetrics = []string{
"etcd_request_latencies_summary", "etcd_request_latencies_summary",
} }
var InterestingControllerManagerMetrics = []string{
"garbage_collector_event_processing_latency_microseconds",
"garbage_collector_dirty_processing_latency_microseconds",
"garbage_collector_orphan_processing_latency_microseconds",
}
var InterestingKubeletMetrics = []string{ var InterestingKubeletMetrics = []string{
"kubelet_container_manager_latency_microseconds", "kubelet_container_manager_latency_microseconds",
"kubelet_docker_errors", "kubelet_docker_errors",

View File

@ -24,6 +24,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_3" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_3"
"k8s.io/kubernetes/pkg/metrics"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
@ -97,6 +98,23 @@ func verifyRemainingObjects(f *framework.Framework, clientSet clientset.Interfac
return ret, nil return ret, nil
} }
func gatherMetrics(f *framework.Framework) {
By("Gathering metrics")
var summary framework.TestDataSummary
grabber, err := metrics.NewMetricsGrabber(f.Client, false, false, true, false)
if err != nil {
framework.Logf("Failed to create MetricsGrabber. Skipping metrics gathering.")
} else {
received, err := grabber.Grab()
if err != nil {
framework.Logf("MetricsGrabber failed grab metrics. Skipping metrics gathering.")
} else {
summary = (*framework.MetricsForE2E)(&received)
framework.Logf(summary.PrintHumanReadable())
}
}
}
var _ = framework.KubeDescribe("Garbage collector", func() { var _ = framework.KubeDescribe("Garbage collector", func() {
f := framework.NewDefaultFramework("gc") f := framework.NewDefaultFramework("gc")
It("[Feature:GarbageCollector] should delete pods created by rc when not orphaning", func() { It("[Feature:GarbageCollector] should delete pods created by rc when not orphaning", func() {
@ -147,6 +165,7 @@ var _ = framework.KubeDescribe("Garbage collector", func() {
framework.Failf("remaining pods are: %#v", remainingPods) framework.Failf("remaining pods are: %#v", remainingPods)
} }
} }
gatherMetrics(f)
}) })
It("[Feature:GarbageCollector] should orphan pods created by rc", func() { It("[Feature:GarbageCollector] should orphan pods created by rc", func() {
@ -193,5 +212,6 @@ var _ = framework.KubeDescribe("Garbage collector", func() {
}); err != nil && err != wait.ErrWaitTimeout { }); err != nil && err != wait.ErrWaitTimeout {
framework.Failf("%v", err) framework.Failf("%v", err)
} }
gatherMetrics(f)
}) })
}) })

View File

@ -27,6 +27,7 @@ import (
"testing" "testing"
"time" "time"
dto "github.com/prometheus/client_model/go"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
@ -424,6 +425,19 @@ func TestStressingCascadingDeletion(t *testing.T) {
if gc.GraphHasUID(uids) { if gc.GraphHasUID(uids) {
t.Errorf("Expect all nodes representing replication controllers are removed from the Propagator's graph") t.Errorf("Expect all nodes representing replication controllers are removed from the Propagator's graph")
} }
metric := &dto.Metric{}
garbagecollector.EventProcessingLatency.Write(metric)
count := float64(metric.Summary.GetSampleCount())
sum := metric.Summary.GetSampleSum()
t.Logf("Average time spent in GC's eventQueue is %.1f microseconds", sum/count)
garbagecollector.DirtyProcessingLatency.Write(metric)
count = float64(metric.Summary.GetSampleCount())
sum = metric.Summary.GetSampleSum()
t.Logf("Average time spent in GC's dirtyQueue is %.1f microseconds", sum/count)
garbagecollector.OrphanProcessingLatency.Write(metric)
count = float64(metric.Summary.GetSampleCount())
sum = metric.Summary.GetSampleSum()
t.Logf("Average time spent in GC's orphanQueue is %.1f microseconds", sum/count)
} }
func TestOrphaning(t *testing.T) { func TestOrphaning(t *testing.T) {