mirror of
				https://github.com/distribution/distribution.git
				synced 2025-10-25 14:28:28 +00:00 
			
		
		
		
	Since the notifications package is now decoupled from storage, we are moving it to the root package. Signed-off-by: Stephen J Day <stephen.day@docker.com>
		
			
				
	
	
		
			153 lines
		
	
	
		
			4.0 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			153 lines
		
	
	
		
			4.0 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package notifications
 | |
| 
 | |
| import (
 | |
| 	"expvar"
 | |
| 	"fmt"
 | |
| 	"net/http"
 | |
| 	"sync"
 | |
| )
 | |
| 
 | |
| // EndpointMetrics track various actions taken by the endpoint, typically by
 | |
| // number of events. The goal of this to export it via expvar but we may find
 | |
| // some other future solution to be better.
 | |
| type EndpointMetrics struct {
 | |
| 	Pending   int            // events pending in queue
 | |
| 	Events    int            // total events incoming
 | |
| 	Successes int            // total events written successfully
 | |
| 	Failures  int            // total events failed
 | |
| 	Errors    int            // total events errored
 | |
| 	Statuses  map[string]int // status code histogram, per call event
 | |
| }
 | |
| 
 | |
| // safeMetrics guards the metrics implementation with a lock and provides a
 | |
| // safe update function.
 | |
| type safeMetrics struct {
 | |
| 	EndpointMetrics
 | |
| 	sync.Mutex // protects statuses map
 | |
| }
 | |
| 
 | |
| // newSafeMetrics returns safeMetrics with map allocated.
 | |
| func newSafeMetrics() *safeMetrics {
 | |
| 	var sm safeMetrics
 | |
| 	sm.Statuses = make(map[string]int)
 | |
| 	return &sm
 | |
| }
 | |
| 
 | |
| // httpStatusListener returns the listener for the http sink that updates the
 | |
| // relevent counters.
 | |
| func (sm *safeMetrics) httpStatusListener() httpStatusListener {
 | |
| 	return &endpointMetricsHTTPStatusListener{
 | |
| 		safeMetrics: sm,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // eventQueueListener returns a listener that maintains queue related counters.
 | |
| func (sm *safeMetrics) eventQueueListener() eventQueueListener {
 | |
| 	return &endpointMetricsEventQueueListener{
 | |
| 		safeMetrics: sm,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // endpointMetricsHTTPStatusListener increments counters related to http sinks
 | |
| // for the relevent events.
 | |
| type endpointMetricsHTTPStatusListener struct {
 | |
| 	*safeMetrics
 | |
| }
 | |
| 
 | |
| var _ httpStatusListener = &endpointMetricsHTTPStatusListener{}
 | |
| 
 | |
| func (emsl *endpointMetricsHTTPStatusListener) success(status int, events ...Event) {
 | |
| 	emsl.safeMetrics.Lock()
 | |
| 	defer emsl.safeMetrics.Unlock()
 | |
| 	emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events)
 | |
| 	emsl.Successes += len(events)
 | |
| }
 | |
| 
 | |
| func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Event) {
 | |
| 	emsl.safeMetrics.Lock()
 | |
| 	defer emsl.safeMetrics.Unlock()
 | |
| 	emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events)
 | |
| 	emsl.Failures += len(events)
 | |
| }
 | |
| 
 | |
| func (emsl *endpointMetricsHTTPStatusListener) err(err error, events ...Event) {
 | |
| 	emsl.safeMetrics.Lock()
 | |
| 	defer emsl.safeMetrics.Unlock()
 | |
| 	emsl.Errors += len(events)
 | |
| }
 | |
| 
 | |
| // endpointMetricsEventQueueListener maintains the incoming events counter and
 | |
| // the queues pending count.
 | |
| type endpointMetricsEventQueueListener struct {
 | |
| 	*safeMetrics
 | |
| }
 | |
| 
 | |
| func (eqc *endpointMetricsEventQueueListener) ingress(events ...Event) {
 | |
| 	eqc.Lock()
 | |
| 	defer eqc.Unlock()
 | |
| 	eqc.Events += len(events)
 | |
| 	eqc.Pending += len(events)
 | |
| }
 | |
| 
 | |
| func (eqc *endpointMetricsEventQueueListener) egress(events ...Event) {
 | |
| 	eqc.Lock()
 | |
| 	defer eqc.Unlock()
 | |
| 	eqc.Pending -= len(events)
 | |
| }
 | |
| 
 | |
| // endpoints is global registry of endpoints used to report metrics to expvar
 | |
| var endpoints struct {
 | |
| 	registered []*Endpoint
 | |
| 	mu         sync.Mutex
 | |
| }
 | |
| 
 | |
| // register places the endpoint into expvar so that stats are tracked.
 | |
| func register(e *Endpoint) {
 | |
| 	endpoints.mu.Lock()
 | |
| 	defer endpoints.mu.Unlock()
 | |
| 
 | |
| 	endpoints.registered = append(endpoints.registered, e)
 | |
| }
 | |
| 
 | |
| func init() {
 | |
| 	// NOTE(stevvooe): Setup registry metrics structure to report to expvar.
 | |
| 	// Ideally, we do more metrics through logging but we need some nice
 | |
| 	// realtime metrics for queue state for now.
 | |
| 
 | |
| 	registry := expvar.Get("registry")
 | |
| 
 | |
| 	if registry == nil {
 | |
| 		registry = expvar.NewMap("registry")
 | |
| 	}
 | |
| 
 | |
| 	var notifications expvar.Map
 | |
| 	notifications.Init()
 | |
| 	notifications.Set("endpoints", expvar.Func(func() interface{} {
 | |
| 		endpoints.mu.Lock()
 | |
| 		defer endpoints.mu.Unlock()
 | |
| 
 | |
| 		var names []interface{}
 | |
| 		for _, v := range endpoints.registered {
 | |
| 			var epjson struct {
 | |
| 				Name string `json:"name"`
 | |
| 				URL  string `json:"url"`
 | |
| 				EndpointConfig
 | |
| 
 | |
| 				Metrics EndpointMetrics
 | |
| 			}
 | |
| 
 | |
| 			epjson.Name = v.Name()
 | |
| 			epjson.URL = v.URL()
 | |
| 			epjson.EndpointConfig = v.EndpointConfig
 | |
| 
 | |
| 			v.ReadMetrics(&epjson.Metrics)
 | |
| 
 | |
| 			names = append(names, epjson)
 | |
| 		}
 | |
| 
 | |
| 		return names
 | |
| 	}))
 | |
| 
 | |
| 	registry.(*expvar.Map).Set("notifications", ¬ifications)
 | |
| }
 |