Added handling custom metrics to resource consumer.

Added handling custom metrics in Prometheus format to resource consumer.
This commit is contained in:
Jerzy Szczepkowski 2015-12-10 15:46:22 +01:00
parent 05394f9301
commit aade6e78b2
3 changed files with 113 additions and 36 deletions

View File

@ -1,7 +1,6 @@
# Resource Consumer
## Overview
Resource Consumer is a tool which allows to generate cpu/memory utilization in a container.
The reason why it was created is testing kubernetes autoscaling.
Resource Consumer can help with autoscaling tests for:
@ -10,8 +9,7 @@ Resource Consumer can help with autoscaling tests for:
- vertical autoscaling of pod - changing its resource limits.
## Usage
Resource Consumer starts an HTTP server and handle sended requests.
Resource Consumer starts an HTTP server and handle sent requests.
It listens on port given as a flag (default 8080).
Action of consuming resources is send to the container by a POST http request.
Each http request creates new process.
@ -20,12 +18,10 @@ Http request handler is in file resource_consumer_handler.go
The container consumes specified amount of resources:
- CPU in millicores,
- Memory in megabytes.
- Memory in megabytes,
- Fake custom metrics.
###Consume CPU http request
- suffix "ConsumeCPU",
- parameters "millicores" and "durationSec".
@ -36,13 +32,19 @@ and if consumption is too high binary sleeps for 10 millisecond.
One replica of Resource Consumer cannot consume more that 1 cpu.
###Consume Memory http request
- suffix "ConsumeMem",
- parameters "megabytes" and "durationSec".
Consumes specified amount of megabytes for durationSec seconds.
Consume Memory uses stress tool (stress -m 1 --vm-bytes megabytes --vm-hang 0 -t durationSec).
Request leading to consumig more memory then container limit will be ignored.
Request leading to consuming more memory then container limit will be ignored.
###Bump value of a fake custom metric
- suffix "BumpMetric",
- parameters "metric", "delta" and "durationSec".
Bumps metric with given name by delta for durationSec seconds.
Custom metrics in Prometheus format are exposed on "/metrics" endpoint.
###CURL example
```console

View File

@ -27,6 +27,6 @@ var port = flag.Int("port", 8080, "Port number.")
func main() {
flag.Parse()
var resourceConsumerHandler ResourceConsumerHandler
resourceConsumerHandler := NewResourceConsumerHandler()
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", *port), resourceConsumerHandler))
}

View File

@ -21,6 +21,8 @@ import (
"net/http"
"net/url"
"strconv"
"sync"
"time"
)
const (
@ -30,17 +32,34 @@ const (
notGivenFunctionArgument = "not given function argument"
consumeCPUAddress = "/ConsumeCPU"
consumeMemAddress = "/ConsumeMem"
bumpMetricAddress = "/BumpMetric"
getCurrentStatusAddress = "/GetCurrentStatus"
metricsAddress = "/metrics"
millicoresQuery = "millicores"
megabytesQuery = "megabytes"
metricNameQuery = "metric"
deltaQuery = "delta"
durationSecQuery = "durationSec"
)
type ResourceConsumerHandler struct{}
type ResourceConsumerHandler struct {
metrics map[string]float64
metricsLock sync.Mutex
}
func NewResourceConsumerHandler() ResourceConsumerHandler {
return ResourceConsumerHandler{metrics: map[string]float64{}}
}
func (handler ResourceConsumerHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// handle exposing metrics in Prometheus format (both GET & POST)
if req.URL.Path == metricsAddress {
handler.handleMetrics(w)
return
}
if req.Method != "POST" {
http.Error(w, badRequest, http.StatusBadRequest)
return
}
// parsing POST request data and URL data
if err := req.ParseForm(); err != nil {
@ -62,6 +81,11 @@ func (handler ResourceConsumerHandler) ServeHTTP(w http.ResponseWriter, req *htt
handler.handleGetCurrentStatus(w)
return
}
// handle bumpMetric
if req.URL.Path == bumpMetricAddress {
handler.handleBumpMetric(w, req.Form)
return
}
http.Error(w, unknownFunction, http.StatusNotFound)
}
@ -72,21 +96,20 @@ func (handler ResourceConsumerHandler) handleConsumeCPU(w http.ResponseWriter, q
if durationSecString == "" || millicoresString == "" {
http.Error(w, notGivenFunctionArgument, http.StatusBadRequest)
return
} else {
// convert data (strings to ints) for consumeCPU
durationSec, durationSecError := strconv.Atoi(durationSecString)
millicores, millicoresError := strconv.Atoi(millicoresString)
if durationSecError != nil || millicoresError != nil {
http.Error(w, incorrectFunctionArgument, http.StatusBadRequest)
return
}
go ConsumeCPU(millicores, durationSec)
fmt.Fprintln(w, consumeCPUAddress[1:])
fmt.Fprintln(w, millicores, millicoresQuery)
fmt.Fprintln(w, durationSec, durationSecQuery)
}
// convert data (strings to ints) for consumeCPU
durationSec, durationSecError := strconv.Atoi(durationSecString)
millicores, millicoresError := strconv.Atoi(millicoresString)
if durationSecError != nil || millicoresError != nil {
http.Error(w, incorrectFunctionArgument, http.StatusBadRequest)
return
}
go ConsumeCPU(millicores, durationSec)
fmt.Fprintln(w, consumeCPUAddress[1:])
fmt.Fprintln(w, millicores, millicoresQuery)
fmt.Fprintln(w, durationSec, durationSecQuery)
}
func (handler ResourceConsumerHandler) handleConsumeMem(w http.ResponseWriter, query url.Values) {
@ -96,19 +119,20 @@ func (handler ResourceConsumerHandler) handleConsumeMem(w http.ResponseWriter, q
if durationSecString == "" || megabytesString == "" {
http.Error(w, notGivenFunctionArgument, http.StatusBadRequest)
return
} else {
// convert data (strings to ints) for consumeMem
durationSec, durationSecError := strconv.Atoi(durationSecString)
megabytes, megabytesError := strconv.Atoi(megabytesString)
if durationSecError != nil || megabytesError != nil {
http.Error(w, incorrectFunctionArgument, http.StatusBadRequest)
return
}
go ConsumeMem(megabytes, durationSec)
fmt.Fprintln(w, consumeMemAddress[1:])
fmt.Fprintln(w, megabytes, megabytesQuery)
fmt.Fprintln(w, durationSec, durationSecQuery)
}
// convert data (strings to ints) for consumeMem
durationSec, durationSecError := strconv.Atoi(durationSecString)
megabytes, megabytesError := strconv.Atoi(megabytesString)
if durationSecError != nil || megabytesError != nil {
http.Error(w, incorrectFunctionArgument, http.StatusBadRequest)
return
}
go ConsumeMem(megabytes, durationSec)
fmt.Fprintln(w, consumeMemAddress[1:])
fmt.Fprintln(w, megabytes, megabytesQuery)
fmt.Fprintln(w, durationSec, durationSecQuery)
}
func (handler ResourceConsumerHandler) handleGetCurrentStatus(w http.ResponseWriter) {
@ -116,3 +140,54 @@ func (handler ResourceConsumerHandler) handleGetCurrentStatus(w http.ResponseWri
fmt.Fprintln(w, "Warning: not implemented!")
fmt.Fprint(w, getCurrentStatusAddress[1:])
}
func (handler ResourceConsumerHandler) handleMetrics(w http.ResponseWriter) {
handler.metricsLock.Lock()
defer handler.metricsLock.Unlock()
for k, v := range handler.metrics {
fmt.Fprintf(w, "# HELP %s info message.\n", k)
fmt.Fprintf(w, "# TYPE %s gauge\n", k)
fmt.Fprintf(w, "%s %f\n", k, v)
}
}
func (handler ResourceConsumerHandler) bumpMetric(metric string, delta float64, duration time.Duration) {
handler.metricsLock.Lock()
if _, ok := handler.metrics[metric]; ok {
handler.metrics[metric] += delta
} else {
handler.metrics[metric] = delta
}
handler.metricsLock.Unlock()
time.Sleep(duration)
handler.metricsLock.Lock()
handler.metrics[metric] -= delta
handler.metricsLock.Unlock()
}
func (handler ResourceConsumerHandler) handleBumpMetric(w http.ResponseWriter, query url.Values) {
// geting string data for handleBumpMetric
metric := query.Get(metricNameQuery)
deltaString := query.Get(deltaQuery)
durationSecString := query.Get(durationSecQuery)
if durationSecString == "" || metric == "" || deltaString == "" {
http.Error(w, notGivenFunctionArgument, http.StatusBadRequest)
return
}
// convert data (strings to ints/floats) for handleBumpMetric
durationSec, durationSecError := strconv.Atoi(durationSecString)
delta, deltaError := strconv.ParseFloat(deltaString, 64)
if durationSecError != nil || deltaError != nil {
http.Error(w, incorrectFunctionArgument, http.StatusBadRequest)
return
}
go handler.bumpMetric(metric, delta, time.Duration(durationSec)*time.Second)
fmt.Fprintln(w, bumpMetricAddress[1:])
fmt.Fprintln(w, metric, metricNameQuery)
fmt.Fprintln(w, delta, deltaQuery)
fmt.Fprintln(w, durationSec, durationSecQuery)
}