Rework of resource consumer.

Major rework of resource consumer: added controller running as a pod that spreads requests around consumers. This should fix #21664 and #23536.
This commit is contained in:
Jerzy Szczepkowski 2016-05-10 09:42:12 +02:00
parent 2976e892a4
commit 967a7c95d2
8 changed files with 429 additions and 136 deletions

View File

@ -70,6 +70,9 @@ configure-cbr0
configure-cloud-routes
conntrack-max
conntrack-tcp-timeout-established
consumer-port
consumer-service-name
consumer-service-namespace
contain-pod-resources
container-port
container-runtime
@ -463,3 +466,4 @@ watch-only
whitelist-override-label
windows-line-endings
www-prefix

View File

@ -40,7 +40,8 @@ const (
timeoutRC = 120 * time.Second
startServiceTimeout = time.Minute
startServiceInterval = 5 * time.Second
resourceConsumerImage = "gcr.io/google_containers/resource_consumer:beta2"
resourceConsumerImage = "gcr.io/google_containers/resource_consumer:beta4"
resourceConsumerControllerImage = "gcr.io/google_containers/resource_consumer/controller:beta4"
rcIsNil = "ERROR: replicationController = nil"
deploymentIsNil = "ERROR: deployment = nil"
rsIsNil = "ERROR: replicaset = nil"
@ -58,6 +59,7 @@ rc.ConsumeCPU(300)
*/
type ResourceConsumer struct {
name string
controllerName string
kind string
framework *framework.Framework
cpu chan int
@ -97,6 +99,7 @@ func newResourceConsumer(name, kind string, replicas, initCPUTotal, initMemoryTo
runServiceAndWorkloadForResourceConsumer(f.Client, f.Namespace.Name, name, kind, replicas, cpuLimit, memLimit)
rc := &ResourceConsumer{
name: name,
controllerName: name + "-controller",
kind: kind,
framework: f,
cpu: make(chan int),
@ -111,8 +114,10 @@ func newResourceConsumer(name, kind string, replicas, initCPUTotal, initMemoryTo
requestSizeInMegabytes: requestSizeInMegabytes,
requestSizeCustomMetric: requestSizeCustomMetric,
}
go rc.makeConsumeCPURequests()
rc.ConsumeCPU(initCPUTotal)
go rc.makeConsumeMemRequests()
rc.ConsumeMem(initMemoryTotal)
go rc.makeConsumeCustomMetric()
@ -140,25 +145,15 @@ func (rc *ResourceConsumer) ConsumeCustomMetric(amount int) {
func (rc *ResourceConsumer) makeConsumeCPURequests() {
defer GinkgoRecover()
var count int
var rest int
sleepTime := time.Duration(0)
millicores := 0
for {
select {
case millicores := <-rc.cpu:
framework.Logf("RC %s: consume %v millicores in total", rc.name, millicores)
if rc.requestSizeInMillicores != 0 {
count = millicores / rc.requestSizeInMillicores
}
rest = millicores - count*rc.requestSizeInMillicores
case millicores = <-rc.cpu:
framework.Logf("RC %s: setting consumption to %v millicores in total", rc.name, millicores)
case <-time.After(sleepTime):
framework.Logf("RC %s: sending %v requests to consume %v millicores each and 1 request to consume %v millicores", rc.name, count, rc.requestSizeInMillicores, rest)
if count > 0 {
rc.sendConsumeCPURequests(count, rc.requestSizeInMillicores, rc.consumptionTimeInSeconds)
}
if rest > 0 {
go rc.sendOneConsumeCPURequest(rest, rc.consumptionTimeInSeconds)
}
framework.Logf("RC %s: sending request to consume %d millicores", rc.name, millicores)
rc.sendConsumeCPURequest(millicores)
sleepTime = rc.sleepTime
case <-rc.stopCPU:
return
@ -168,25 +163,15 @@ func (rc *ResourceConsumer) makeConsumeCPURequests() {
func (rc *ResourceConsumer) makeConsumeMemRequests() {
defer GinkgoRecover()
var count int
var rest int
sleepTime := time.Duration(0)
megabytes := 0
for {
select {
case megabytes := <-rc.mem:
framework.Logf("RC %s: consume %v MB in total", rc.name, megabytes)
if rc.requestSizeInMegabytes != 0 {
count = megabytes / rc.requestSizeInMegabytes
}
rest = megabytes - count*rc.requestSizeInMegabytes
case megabytes = <-rc.mem:
framework.Logf("RC %s: setting consumption to %v MB in total", rc.name, megabytes)
case <-time.After(sleepTime):
framework.Logf("RC %s: sending %v requests to consume %v MB each and 1 request to consume %v MB", rc.name, count, rc.requestSizeInMegabytes, rest)
if count > 0 {
rc.sendConsumeMemRequests(count, rc.requestSizeInMegabytes, rc.consumptionTimeInSeconds)
}
if rest > 0 {
go rc.sendOneConsumeMemRequest(rest, rc.consumptionTimeInSeconds)
}
framework.Logf("RC %s: sending request to consume %d MB", rc.name, megabytes)
rc.sendConsumeMemRequest(megabytes)
sleepTime = rc.sleepTime
case <-rc.stopMem:
return
@ -196,26 +181,15 @@ func (rc *ResourceConsumer) makeConsumeMemRequests() {
func (rc *ResourceConsumer) makeConsumeCustomMetric() {
defer GinkgoRecover()
var count int
var rest int
sleepTime := time.Duration(0)
delta := 0
for {
select {
case total := <-rc.customMetric:
framework.Logf("RC %s: consume custom metric %v in total", rc.name, total)
if rc.requestSizeInMegabytes != 0 {
count = total / rc.requestSizeCustomMetric
}
rest = total - count*rc.requestSizeCustomMetric
case delta := <-rc.customMetric:
framework.Logf("RC %s: setting bump of metric %s to %d in total", rc.name, customMetricName, delta)
case <-time.After(sleepTime):
framework.Logf("RC %s: sending %v requests to consume %v custom metric each and 1 request to consume %v",
rc.name, count, rc.requestSizeCustomMetric, rest)
if count > 0 {
rc.sendConsumeCustomMetric(count, rc.requestSizeCustomMetric, rc.consumptionTimeInSeconds)
}
if rest > 0 {
go rc.sendOneConsumeCustomMetric(rest, rc.consumptionTimeInSeconds)
}
framework.Logf("RC %s: sending request to consume %d of custom metric %s", rc.name, delta, customMetricName)
rc.sendConsumeCustomMetric(delta)
sleepTime = rc.sleepTime
case <-rc.stopCustomMetric:
return
@ -223,64 +197,48 @@ func (rc *ResourceConsumer) makeConsumeCustomMetric() {
}
}
func (rc *ResourceConsumer) sendConsumeCPURequests(requests, millicores, durationSec int) {
for i := 0; i < requests; i++ {
go rc.sendOneConsumeCPURequest(millicores, durationSec)
}
}
func (rc *ResourceConsumer) sendConsumeMemRequests(requests, megabytes, durationSec int) {
for i := 0; i < requests; i++ {
go rc.sendOneConsumeMemRequest(megabytes, durationSec)
}
}
func (rc *ResourceConsumer) sendConsumeCustomMetric(requests, delta, durationSec int) {
for i := 0; i < requests; i++ {
go rc.sendOneConsumeCustomMetric(delta, durationSec)
}
}
// sendOneConsumeCPURequest sends POST request for cpu consumption
func (rc *ResourceConsumer) sendOneConsumeCPURequest(millicores int, durationSec int) {
defer GinkgoRecover()
func (rc *ResourceConsumer) sendConsumeCPURequest(millicores int) {
proxyRequest, err := framework.GetServicesProxyRequest(rc.framework.Client, rc.framework.Client.Post())
framework.ExpectNoError(err)
_, err = proxyRequest.Namespace(rc.framework.Namespace.Name).
Name(rc.name).
req := proxyRequest.Namespace(rc.framework.Namespace.Name).
Name(rc.controllerName).
Suffix("ConsumeCPU").
Param("millicores", strconv.Itoa(millicores)).
Param("durationSec", strconv.Itoa(durationSec)).
DoRaw()
Param("durationSec", strconv.Itoa(rc.consumptionTimeInSeconds)).
Param("requestSizeMillicores", strconv.Itoa(rc.requestSizeInMillicores))
framework.Logf("URL: %v", *req.URL())
_, err = req.DoRaw()
framework.ExpectNoError(err)
}
// sendOneConsumeMemRequest sends POST request for memory consumption
func (rc *ResourceConsumer) sendOneConsumeMemRequest(megabytes int, durationSec int) {
defer GinkgoRecover()
// sendConsumeMemRequest sends POST request for memory consumption
func (rc *ResourceConsumer) sendConsumeMemRequest(megabytes int) {
proxyRequest, err := framework.GetServicesProxyRequest(rc.framework.Client, rc.framework.Client.Post())
framework.ExpectNoError(err)
_, err = proxyRequest.Namespace(rc.framework.Namespace.Name).
Name(rc.name).
req := proxyRequest.Namespace(rc.framework.Namespace.Name).
Name(rc.controllerName).
Suffix("ConsumeMem").
Param("megabytes", strconv.Itoa(megabytes)).
Param("durationSec", strconv.Itoa(durationSec)).
DoRaw()
Param("durationSec", strconv.Itoa(rc.consumptionTimeInSeconds)).
Param("requestSizeMegabytes", strconv.Itoa(rc.requestSizeInMegabytes))
framework.Logf("URL: %v", *req.URL())
_, err = req.DoRaw()
framework.ExpectNoError(err)
}
// sendOneConsumeCustomMetric sends POST request for custom metric consumption
func (rc *ResourceConsumer) sendOneConsumeCustomMetric(delta int, durationSec int) {
defer GinkgoRecover()
// sendConsumeCustomMetric sends POST request for custom metric consumption
func (rc *ResourceConsumer) sendConsumeCustomMetric(delta int) {
proxyRequest, err := framework.GetServicesProxyRequest(rc.framework.Client, rc.framework.Client.Post())
framework.ExpectNoError(err)
_, err = proxyRequest.Namespace(rc.framework.Namespace.Name).
Name(rc.name).
req := proxyRequest.Namespace(rc.framework.Namespace.Name).
Name(rc.controllerName).
Suffix("BumpMetric").
Param("metric", customMetricName).
Param("delta", strconv.Itoa(delta)).
Param("durationSec", strconv.Itoa(durationSec)).
DoRaw()
Param("durationSec", strconv.Itoa(rc.consumptionTimeInSeconds)).
Param("requestSizeMetrics", strconv.Itoa(rc.requestSizeCustomMetric))
framework.Logf("URL: %v", *req.URL())
_, err = req.DoRaw()
framework.ExpectNoError(err)
}
@ -346,6 +304,8 @@ func (rc *ResourceConsumer) CleanUp() {
time.Sleep(10 * time.Second)
framework.ExpectNoError(framework.DeleteRC(rc.framework.Client, rc.framework.Namespace.Name, rc.name))
framework.ExpectNoError(rc.framework.Client.Services(rc.framework.Namespace.Name).Delete(rc.name))
framework.ExpectNoError(framework.DeleteRC(rc.framework.Client, rc.framework.Namespace.Name, rc.controllerName))
framework.ExpectNoError(rc.framework.Client.Services(rc.framework.Namespace.Name).Delete(rc.controllerName))
}
func runServiceAndWorkloadForResourceConsumer(c *client.Client, ns, name, kind string, replicas int, cpuLimitMillis, memLimitMb int64) {
@ -400,6 +360,38 @@ func runServiceAndWorkloadForResourceConsumer(c *client.Client, ns, name, kind s
framework.Failf(invalidKind)
}
By(fmt.Sprintf("Running controller"))
controllerName := name + "-controller"
_, err = c.Services(ns).Create(&api.Service{
ObjectMeta: api.ObjectMeta{
Name: controllerName,
},
Spec: api.ServiceSpec{
Ports: []api.ServicePort{{
Port: port,
TargetPort: intstr.FromInt(targetPort),
}},
Selector: map[string]string{
"name": controllerName,
},
},
})
framework.ExpectNoError(err)
dnsClusterFirst := api.DNSClusterFirst
controllerRcConfig := framework.RCConfig{
Client: c,
Image: resourceConsumerControllerImage,
Name: controllerName,
Namespace: ns,
Timeout: timeoutRC,
Replicas: 1,
Command: []string{"/controller", "--consumer-service-name=" + name, "--consumer-service-namespace=" + ns, "--consumer-port=80"},
DNSPolicy: &dnsClusterFirst,
}
framework.ExpectNoError(framework.RunRC(controllerRcConfig))
// Make sure endpoints are propagated.
// TODO(piosz): replace sleep with endpoints watch.
time.Sleep(10 * time.Second)

View File

@ -271,6 +271,7 @@ type RCConfig struct {
MemRequest int64 // bytes
MemLimit int64 // bytes
ReadinessProbe *api.Probe
DNSPolicy *api.DNSPolicy
// Env vars, set the same for every pod.
Env map[string]string
@ -2181,6 +2182,10 @@ func RunRC(config RCConfig) error {
func (config *RCConfig) create() error {
By(fmt.Sprintf("creating replication controller %s in namespace %s", config.Name, config.Namespace))
dnsDefault := api.DNSDefault
if config.DNSPolicy == nil {
config.DNSPolicy = &dnsDefault
}
rc := &api.ReplicationController{
ObjectMeta: api.ObjectMeta{
Name: config.Name,
@ -2204,7 +2209,7 @@ func (config *RCConfig) create() error {
ReadinessProbe: config.ReadinessProbe,
},
},
DNSPolicy: api.DNSDefault,
DNSPolicy: *config.DNSPolicy,
},
},
},

View File

@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
TAG = beta2
TAG = beta4
PREFIX = gcr.io/google_containers
all: clean consumer
@ -20,18 +20,22 @@ all: clean consumer
consumer:
CGO_ENABLED=0 go build -a -installsuffix cgo --ldflags '-w' -o consume-cpu/consume-cpu ./consume-cpu/consume_cpu.go
CGO_ENABLED=0 go build -a -installsuffix cgo --ldflags '-w' -o consumer .
CGO_ENABLED=0 go build -a -installsuffix cgo --ldflags '-w' -o controller/controller ./controller/controller.go
container: image
image:
sudo docker build -t $(PREFIX)/resource_consumer:$(TAG) .
sudo docker build -t $(PREFIX)/resource_consumer/controller:$(TAG) controller
run_container:
docker run --publish=8080:8080 $(PREFIX)/resource_consumer:$(TAG)
push:
@echo "This image is not meant to be pushed."
gcloud docker push ${PREFIX}/resource_consumer:${TAG}
gcloud docker push ${PREFIX}/resource_consumer/controller:${TAG}
clean:
rm -f consumer
rm -f consume-cpu/consume-cpu
rm -f controller/controller

View File

@ -0,0 +1,41 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package common
const (
ConsumeCPUAddress = "/ConsumeCPU"
ConsumeMemAddress = "/ConsumeMem"
BumpMetricAddress = "/BumpMetric"
GetCurrentStatusAddress = "/GetCurrentStatus"
MetricsAddress = "/Metrics"
MillicoresQuery = "millicores"
MegabytesQuery = "megabytes"
MetricNameQuery = "metric"
DeltaQuery = "delta"
DurationSecQuery = "durationSec"
RequestSizeInMillicoresQuery = "requestSizeMillicores"
RequestSizeInMegabytesQuery = "requestSizeMegabytes"
RequestSizeCustomMetricQuery = "requestSizeMetrics"
BadRequest = "Bad request. Not a POST request"
UnknownFunction = "unknown function"
IncorrectFunctionArgument = "incorrect function argument"
NotGivenFunctionArgument = "not given function argument"
FrameworkName = "horizontal-pod-autoscaling"
)

View File

@ -0,0 +1,19 @@
# Copyright 2016 The Kubernetes Authors All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FROM busybox
MAINTAINER Jerzy Szczepkowski <jsz@google.com>
ADD controller /controller
EXPOSE 8080
ENTRYPOINT ["/controller"]

View File

@ -0,0 +1,243 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"flag"
"fmt"
"log"
"net/http"
"net/url"
"strconv"
"sync"
. "k8s.io/kubernetes/test/images/resource-consumer/common"
)
var port = flag.Int("port", 8080, "Port number.")
var consumerPort = flag.Int("consumer-port", 8080, "Port number of consumers.")
var consumerServiceName = flag.String("consumer-service-name", "resource-consumer", "Name of service containing resource consumers.")
var consumerServiceNamespace = flag.String("consumer-service-namespace", "default", "Namespace of service containing resource consumers.")
func main() {
flag.Parse()
mgr := NewController()
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", *port), mgr))
}
type Controller struct {
responseWriterLock sync.Mutex
waitGroup sync.WaitGroup
}
func NewController() *Controller {
c := &Controller{}
return c
}
func (handler *Controller) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method != "POST" {
http.Error(w, BadRequest, http.StatusBadRequest)
return
}
// parsing POST request data and URL data
if err := req.ParseForm(); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// handle consumeCPU
if req.URL.Path == ConsumeCPUAddress {
handler.handleConsumeCPU(w, req.Form)
return
}
// handle consumeMem
if req.URL.Path == ConsumeMemAddress {
handler.handleConsumeMem(w, req.Form)
return
}
// handle bumpMetric
if req.URL.Path == BumpMetricAddress {
handler.handleBumpMetric(w, req.Form)
return
}
http.Error(w, UnknownFunction, http.StatusNotFound)
}
func (handler *Controller) handleConsumeCPU(w http.ResponseWriter, query url.Values) {
// geting string data for consumeCPU
durationSecString := query.Get(DurationSecQuery)
millicoresString := query.Get(MillicoresQuery)
requestSizeInMillicoresString := query.Get(RequestSizeInMillicoresQuery)
if durationSecString == "" || millicoresString == "" || requestSizeInMillicoresString == "" {
http.Error(w, NotGivenFunctionArgument, http.StatusBadRequest)
return
}
// convert data (strings to ints) for consumeCPU
durationSec, durationSecError := strconv.Atoi(durationSecString)
millicores, millicoresError := strconv.Atoi(millicoresString)
requestSizeInMillicores, requestSizeInMillicoresError := strconv.Atoi(requestSizeInMillicoresString)
if durationSecError != nil || millicoresError != nil || requestSizeInMillicoresError != nil || requestSizeInMillicores <= 0 {
http.Error(w, IncorrectFunctionArgument, http.StatusBadRequest)
return
}
count := millicores / requestSizeInMillicores
rest := millicores - count*requestSizeInMillicores
fmt.Fprintf(w, "RC manager: sending %v requests to consume %v millicores each and 1 request to consume %v millicores\n", count, requestSizeInMillicores, rest)
if count > 0 {
handler.waitGroup.Add(count)
handler.sendConsumeCPURequests(w, count, requestSizeInMillicores, durationSec)
}
if rest > 0 {
handler.waitGroup.Add(1)
go handler.sendOneConsumeCPURequest(w, rest, durationSec)
}
handler.waitGroup.Wait()
}
func (handler *Controller) handleConsumeMem(w http.ResponseWriter, query url.Values) {
// geting string data for consumeMem
durationSecString := query.Get(DurationSecQuery)
megabytesString := query.Get(MegabytesQuery)
requestSizeInMegabytesString := query.Get(RequestSizeInMegabytesQuery)
if durationSecString == "" || megabytesString == "" || requestSizeInMegabytesString == "" {
http.Error(w, NotGivenFunctionArgument, http.StatusBadRequest)
return
}
// convert data (strings to ints) for consumeMem
durationSec, durationSecError := strconv.Atoi(durationSecString)
megabytes, megabytesError := strconv.Atoi(megabytesString)
requestSizeInMegabytes, requestSizeInMegabytesError := strconv.Atoi(requestSizeInMegabytesString)
if durationSecError != nil || megabytesError != nil || requestSizeInMegabytesError != nil || requestSizeInMegabytes <= 0 {
http.Error(w, IncorrectFunctionArgument, http.StatusBadRequest)
return
}
count := megabytes / requestSizeInMegabytes
rest := megabytes - count*requestSizeInMegabytes
fmt.Fprintf(w, "RC manager: sending %v requests to consume %v MB each and 1 request to consume %v MB\n", count, requestSizeInMegabytes, rest)
if count > 0 {
handler.waitGroup.Add(count)
handler.sendConsumeMemRequests(w, count, requestSizeInMegabytes, durationSec)
}
if rest > 0 {
handler.waitGroup.Add(1)
go handler.sendOneConsumeMemRequest(w, rest, durationSec)
}
handler.waitGroup.Wait()
}
func (handler *Controller) handleBumpMetric(w http.ResponseWriter, query url.Values) {
// geting string data for handleBumpMetric
metric := query.Get(MetricNameQuery)
deltaString := query.Get(DeltaQuery)
durationSecString := query.Get(DurationSecQuery)
requestSizeCustomMetricString := query.Get(RequestSizeCustomMetricQuery)
if durationSecString == "" || metric == "" || deltaString == "" || requestSizeCustomMetricString == "" {
http.Error(w, NotGivenFunctionArgument, http.StatusBadRequest)
return
}
// convert data (strings to ints/floats) for handleBumpMetric
durationSec, durationSecError := strconv.Atoi(durationSecString)
delta, deltaError := strconv.Atoi(deltaString)
requestSizeCustomMetric, requestSizeCustomMetricError := strconv.Atoi(requestSizeCustomMetricString)
if durationSecError != nil || deltaError != nil || requestSizeCustomMetricError != nil || requestSizeCustomMetric <= 0 {
http.Error(w, IncorrectFunctionArgument, http.StatusBadRequest)
return
}
count := delta / requestSizeCustomMetric
rest := delta - count*requestSizeCustomMetric
fmt.Fprintf(w, "RC manager: sending %v requests to bump custom metric by %v each and 1 request to bump by %v\n", count, requestSizeCustomMetric, rest)
if count > 0 {
handler.waitGroup.Add(count)
handler.sendConsumeCustomMetric(w, metric, count, requestSizeCustomMetric, durationSec)
}
if rest > 0 {
handler.waitGroup.Add(1)
go handler.sendOneConsumeCustomMetric(w, metric, rest, durationSec)
}
handler.waitGroup.Wait()
}
func (manager *Controller) sendConsumeCPURequests(w http.ResponseWriter, requests, millicores, durationSec int) {
for i := 0; i < requests; i++ {
go manager.sendOneConsumeCPURequest(w, millicores, durationSec)
}
}
func (manager *Controller) sendConsumeMemRequests(w http.ResponseWriter, requests, megabytes, durationSec int) {
for i := 0; i < requests; i++ {
go manager.sendOneConsumeMemRequest(w, megabytes, durationSec)
}
}
func (manager *Controller) sendConsumeCustomMetric(w http.ResponseWriter, metric string, requests, delta, durationSec int) {
for i := 0; i < requests; i++ {
go manager.sendOneConsumeCustomMetric(w, metric, delta, durationSec)
}
}
func createConsumerURL(suffix string) string {
return fmt.Sprintf("http://%s.%s.svc.cluster.local:%d%s", *consumerServiceName, *consumerServiceNamespace, *consumerPort, suffix)
}
// sendOneConsumeCPURequest sends POST request for cpu consumption
func (c *Controller) sendOneConsumeCPURequest(w http.ResponseWriter, millicores int, durationSec int) {
defer c.waitGroup.Done()
query := createConsumerURL(ConsumeCPUAddress)
_, err := http.PostForm(query, url.Values{MillicoresQuery: {strconv.Itoa(millicores)}, DurationSecQuery: {strconv.Itoa(durationSec)}})
c.responseWriterLock.Lock()
defer c.responseWriterLock.Unlock()
if err != nil {
fmt.Fprintf(w, "Failed to connect to consumer: %v\n", err)
return
}
fmt.Fprintf(w, "Consumed %d millicores\n", millicores)
}
// sendOneConsumeMemRequest sends POST request for memory consumption
func (c *Controller) sendOneConsumeMemRequest(w http.ResponseWriter, megabytes int, durationSec int) {
defer c.waitGroup.Done()
query := createConsumerURL(ConsumeMemAddress)
_, err := http.PostForm(query, url.Values{MegabytesQuery: {strconv.Itoa(megabytes)}, DurationSecQuery: {strconv.Itoa(durationSec)}})
c.responseWriterLock.Lock()
defer c.responseWriterLock.Unlock()
if err != nil {
fmt.Fprintf(w, "Failed to connect to consumer: %v\n", err)
return
}
fmt.Fprintf(w, "Consumed %d megabytes\n", megabytes)
}
// sendOneConsumeCustomMetric sends POST request for custom metric consumption
func (c *Controller) sendOneConsumeCustomMetric(w http.ResponseWriter, customMetricName string, delta int, durationSec int) {
defer c.waitGroup.Done()
query := createConsumerURL(BumpMetricAddress)
_, err := http.PostForm(query,
url.Values{MetricNameQuery: {customMetricName}, DurationSecQuery: {strconv.Itoa(durationSec)}, DeltaQuery: {strconv.Itoa(delta)}})
c.responseWriterLock.Lock()
defer c.responseWriterLock.Unlock()
if err != nil {
fmt.Fprintf(w, "Failed to connect to consumer: %v\n", err)
return
}
fmt.Fprintf(w, "Bumped metric %s by %d\n", customMetricName, delta)
}

View File

@ -23,23 +23,8 @@ import (
"strconv"
"sync"
"time"
)
const (
badRequest = "Bad request. Not a POST request"
unknownFunction = "unknown function"
incorrectFunctionArgument = "incorrect function argument"
notGivenFunctionArgument = "not given function argument"
consumeCPUAddress = "/ConsumeCPU"
consumeMemAddress = "/ConsumeMem"
bumpMetricAddress = "/BumpMetric"
getCurrentStatusAddress = "/GetCurrentStatus"
metricsAddress = "/metrics"
millicoresQuery = "millicores"
megabytesQuery = "megabytes"
metricNameQuery = "metric"
deltaQuery = "delta"
durationSecQuery = "durationSec"
. "k8s.io/kubernetes/test/images/resource-consumer/common"
)
type ResourceConsumerHandler struct {
@ -53,12 +38,12 @@ func NewResourceConsumerHandler() *ResourceConsumerHandler {
func (handler *ResourceConsumerHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// handle exposing metrics in Prometheus format (both GET & POST)
if req.URL.Path == metricsAddress {
if req.URL.Path == MetricsAddress {
handler.handleMetrics(w)
return
}
if req.Method != "POST" {
http.Error(w, badRequest, http.StatusBadRequest)
http.Error(w, BadRequest, http.StatusBadRequest)
return
}
// parsing POST request data and URL data
@ -67,34 +52,34 @@ func (handler *ResourceConsumerHandler) ServeHTTP(w http.ResponseWriter, req *ht
return
}
// handle consumeCPU
if req.URL.Path == consumeCPUAddress {
if req.URL.Path == ConsumeCPUAddress {
handler.handleConsumeCPU(w, req.Form)
return
}
// handle consumeMem
if req.URL.Path == consumeMemAddress {
if req.URL.Path == ConsumeMemAddress {
handler.handleConsumeMem(w, req.Form)
return
}
// handle getCurrentStatus
if req.URL.Path == getCurrentStatusAddress {
if req.URL.Path == GetCurrentStatusAddress {
handler.handleGetCurrentStatus(w)
return
}
// handle bumpMetric
if req.URL.Path == bumpMetricAddress {
if req.URL.Path == BumpMetricAddress {
handler.handleBumpMetric(w, req.Form)
return
}
http.Error(w, unknownFunction, http.StatusNotFound)
http.Error(w, fmt.Sprintf("%s: %s", UnknownFunction, req.URL.Path), http.StatusNotFound)
}
func (handler *ResourceConsumerHandler) handleConsumeCPU(w http.ResponseWriter, query url.Values) {
// geting string data for consumeCPU
durationSecString := query.Get(durationSecQuery)
millicoresString := query.Get(millicoresQuery)
durationSecString := query.Get(DurationSecQuery)
millicoresString := query.Get(MillicoresQuery)
if durationSecString == "" || millicoresString == "" {
http.Error(w, notGivenFunctionArgument, http.StatusBadRequest)
http.Error(w, NotGivenFunctionArgument, http.StatusBadRequest)
return
}
@ -102,22 +87,22 @@ func (handler *ResourceConsumerHandler) handleConsumeCPU(w http.ResponseWriter,
durationSec, durationSecError := strconv.Atoi(durationSecString)
millicores, millicoresError := strconv.Atoi(millicoresString)
if durationSecError != nil || millicoresError != nil {
http.Error(w, incorrectFunctionArgument, http.StatusBadRequest)
http.Error(w, IncorrectFunctionArgument, http.StatusBadRequest)
return
}
go ConsumeCPU(millicores, durationSec)
fmt.Fprintln(w, consumeCPUAddress[1:])
fmt.Fprintln(w, millicores, millicoresQuery)
fmt.Fprintln(w, durationSec, durationSecQuery)
fmt.Fprintln(w, ConsumeCPUAddress[1:])
fmt.Fprintln(w, millicores, MillicoresQuery)
fmt.Fprintln(w, durationSec, DurationSecQuery)
}
func (handler *ResourceConsumerHandler) handleConsumeMem(w http.ResponseWriter, query url.Values) {
// geting string data for consumeMem
durationSecString := query.Get(durationSecQuery)
megabytesString := query.Get(megabytesQuery)
durationSecString := query.Get(DurationSecQuery)
megabytesString := query.Get(MegabytesQuery)
if durationSecString == "" || megabytesString == "" {
http.Error(w, notGivenFunctionArgument, http.StatusBadRequest)
http.Error(w, NotGivenFunctionArgument, http.StatusBadRequest)
return
}
@ -125,20 +110,20 @@ func (handler *ResourceConsumerHandler) handleConsumeMem(w http.ResponseWriter,
durationSec, durationSecError := strconv.Atoi(durationSecString)
megabytes, megabytesError := strconv.Atoi(megabytesString)
if durationSecError != nil || megabytesError != nil {
http.Error(w, incorrectFunctionArgument, http.StatusBadRequest)
http.Error(w, IncorrectFunctionArgument, http.StatusBadRequest)
return
}
go ConsumeMem(megabytes, durationSec)
fmt.Fprintln(w, consumeMemAddress[1:])
fmt.Fprintln(w, megabytes, megabytesQuery)
fmt.Fprintln(w, durationSec, durationSecQuery)
fmt.Fprintln(w, ConsumeMemAddress[1:])
fmt.Fprintln(w, megabytes, MegabytesQuery)
fmt.Fprintln(w, durationSec, DurationSecQuery)
}
func (handler *ResourceConsumerHandler) handleGetCurrentStatus(w http.ResponseWriter) {
GetCurrentStatus()
fmt.Fprintln(w, "Warning: not implemented!")
fmt.Fprint(w, getCurrentStatusAddress[1:])
fmt.Fprint(w, GetCurrentStatusAddress[1:])
}
func (handler *ResourceConsumerHandler) handleMetrics(w http.ResponseWriter) {
@ -169,11 +154,11 @@ func (handler *ResourceConsumerHandler) bumpMetric(metric string, delta float64,
func (handler *ResourceConsumerHandler) handleBumpMetric(w http.ResponseWriter, query url.Values) {
// geting string data for handleBumpMetric
metric := query.Get(metricNameQuery)
deltaString := query.Get(deltaQuery)
durationSecString := query.Get(durationSecQuery)
metric := query.Get(MetricNameQuery)
deltaString := query.Get(DeltaQuery)
durationSecString := query.Get(DurationSecQuery)
if durationSecString == "" || metric == "" || deltaString == "" {
http.Error(w, notGivenFunctionArgument, http.StatusBadRequest)
http.Error(w, NotGivenFunctionArgument, http.StatusBadRequest)
return
}
@ -181,13 +166,13 @@ func (handler *ResourceConsumerHandler) handleBumpMetric(w http.ResponseWriter,
durationSec, durationSecError := strconv.Atoi(durationSecString)
delta, deltaError := strconv.ParseFloat(deltaString, 64)
if durationSecError != nil || deltaError != nil {
http.Error(w, incorrectFunctionArgument, http.StatusBadRequest)
http.Error(w, IncorrectFunctionArgument, http.StatusBadRequest)
return
}
go handler.bumpMetric(metric, delta, time.Duration(durationSec)*time.Second)
fmt.Fprintln(w, bumpMetricAddress[1:])
fmt.Fprintln(w, metric, metricNameQuery)
fmt.Fprintln(w, delta, deltaQuery)
fmt.Fprintln(w, durationSec, durationSecQuery)
fmt.Fprintln(w, BumpMetricAddress[1:])
fmt.Fprintln(w, metric, MetricNameQuery)
fmt.Fprintln(w, delta, DeltaQuery)
fmt.Fprintln(w, durationSec, DurationSecQuery)
}