diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index 290eb4045d8..796afbb6d03 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -70,6 +70,9 @@ configure-cbr0 configure-cloud-routes conntrack-max conntrack-tcp-timeout-established +consumer-port +consumer-service-name +consumer-service-namespace contain-pod-resources container-port container-runtime @@ -463,3 +466,4 @@ watch-only whitelist-override-label windows-line-endings www-prefix + diff --git a/test/e2e/autoscaling_utils.go b/test/e2e/autoscaling_utils.go index af30b01be5b..9a995a685df 100644 --- a/test/e2e/autoscaling_utils.go +++ b/test/e2e/autoscaling_utils.go @@ -40,7 +40,8 @@ const ( timeoutRC = 120 * time.Second startServiceTimeout = time.Minute startServiceInterval = 5 * time.Second - resourceConsumerImage = "gcr.io/google_containers/resource_consumer:beta2" + resourceConsumerImage = "gcr.io/google_containers/resource_consumer:beta4" + resourceConsumerControllerImage = "gcr.io/google_containers/resource_consumer/controller:beta4" rcIsNil = "ERROR: replicationController = nil" deploymentIsNil = "ERROR: deployment = nil" rsIsNil = "ERROR: replicaset = nil" @@ -58,6 +59,7 @@ rc.ConsumeCPU(300) */ type ResourceConsumer struct { name string + controllerName string kind string framework *framework.Framework cpu chan int @@ -97,6 +99,7 @@ func newResourceConsumer(name, kind string, replicas, initCPUTotal, initMemoryTo runServiceAndWorkloadForResourceConsumer(f.Client, f.Namespace.Name, name, kind, replicas, cpuLimit, memLimit) rc := &ResourceConsumer{ name: name, + controllerName: name + "-controller", kind: kind, framework: f, cpu: make(chan int), @@ -111,8 +114,10 @@ func newResourceConsumer(name, kind string, replicas, initCPUTotal, initMemoryTo requestSizeInMegabytes: requestSizeInMegabytes, requestSizeCustomMetric: requestSizeCustomMetric, } + go rc.makeConsumeCPURequests() rc.ConsumeCPU(initCPUTotal) + go rc.makeConsumeMemRequests() rc.ConsumeMem(initMemoryTotal) go rc.makeConsumeCustomMetric() @@ -140,25 +145,15 @@ func (rc *ResourceConsumer) ConsumeCustomMetric(amount int) { func (rc *ResourceConsumer) makeConsumeCPURequests() { defer GinkgoRecover() - var count int - var rest int sleepTime := time.Duration(0) + millicores := 0 for { select { - case millicores := <-rc.cpu: - framework.Logf("RC %s: consume %v millicores in total", rc.name, millicores) - if rc.requestSizeInMillicores != 0 { - count = millicores / rc.requestSizeInMillicores - } - rest = millicores - count*rc.requestSizeInMillicores + case millicores = <-rc.cpu: + framework.Logf("RC %s: setting consumption to %v millicores in total", rc.name, millicores) case <-time.After(sleepTime): - framework.Logf("RC %s: sending %v requests to consume %v millicores each and 1 request to consume %v millicores", rc.name, count, rc.requestSizeInMillicores, rest) - if count > 0 { - rc.sendConsumeCPURequests(count, rc.requestSizeInMillicores, rc.consumptionTimeInSeconds) - } - if rest > 0 { - go rc.sendOneConsumeCPURequest(rest, rc.consumptionTimeInSeconds) - } + framework.Logf("RC %s: sending request to consume %d millicores", rc.name, millicores) + rc.sendConsumeCPURequest(millicores) sleepTime = rc.sleepTime case <-rc.stopCPU: return @@ -168,25 +163,15 @@ func (rc *ResourceConsumer) makeConsumeCPURequests() { func (rc *ResourceConsumer) makeConsumeMemRequests() { defer GinkgoRecover() - var count int - var rest int sleepTime := time.Duration(0) + megabytes := 0 for { select { - case megabytes := <-rc.mem: - framework.Logf("RC %s: consume %v MB in total", rc.name, megabytes) - if rc.requestSizeInMegabytes != 0 { - count = megabytes / rc.requestSizeInMegabytes - } - rest = megabytes - count*rc.requestSizeInMegabytes + case megabytes = <-rc.mem: + framework.Logf("RC %s: setting consumption to %v MB in total", rc.name, megabytes) case <-time.After(sleepTime): - framework.Logf("RC %s: sending %v requests to consume %v MB each and 1 request to consume %v MB", rc.name, count, rc.requestSizeInMegabytes, rest) - if count > 0 { - rc.sendConsumeMemRequests(count, rc.requestSizeInMegabytes, rc.consumptionTimeInSeconds) - } - if rest > 0 { - go rc.sendOneConsumeMemRequest(rest, rc.consumptionTimeInSeconds) - } + framework.Logf("RC %s: sending request to consume %d MB", rc.name, megabytes) + rc.sendConsumeMemRequest(megabytes) sleepTime = rc.sleepTime case <-rc.stopMem: return @@ -196,26 +181,15 @@ func (rc *ResourceConsumer) makeConsumeMemRequests() { func (rc *ResourceConsumer) makeConsumeCustomMetric() { defer GinkgoRecover() - var count int - var rest int sleepTime := time.Duration(0) + delta := 0 for { select { - case total := <-rc.customMetric: - framework.Logf("RC %s: consume custom metric %v in total", rc.name, total) - if rc.requestSizeInMegabytes != 0 { - count = total / rc.requestSizeCustomMetric - } - rest = total - count*rc.requestSizeCustomMetric + case delta := <-rc.customMetric: + framework.Logf("RC %s: setting bump of metric %s to %d in total", rc.name, customMetricName, delta) case <-time.After(sleepTime): - framework.Logf("RC %s: sending %v requests to consume %v custom metric each and 1 request to consume %v", - rc.name, count, rc.requestSizeCustomMetric, rest) - if count > 0 { - rc.sendConsumeCustomMetric(count, rc.requestSizeCustomMetric, rc.consumptionTimeInSeconds) - } - if rest > 0 { - go rc.sendOneConsumeCustomMetric(rest, rc.consumptionTimeInSeconds) - } + framework.Logf("RC %s: sending request to consume %d of custom metric %s", rc.name, delta, customMetricName) + rc.sendConsumeCustomMetric(delta) sleepTime = rc.sleepTime case <-rc.stopCustomMetric: return @@ -223,64 +197,48 @@ func (rc *ResourceConsumer) makeConsumeCustomMetric() { } } -func (rc *ResourceConsumer) sendConsumeCPURequests(requests, millicores, durationSec int) { - for i := 0; i < requests; i++ { - go rc.sendOneConsumeCPURequest(millicores, durationSec) - } -} - -func (rc *ResourceConsumer) sendConsumeMemRequests(requests, megabytes, durationSec int) { - for i := 0; i < requests; i++ { - go rc.sendOneConsumeMemRequest(megabytes, durationSec) - } -} - -func (rc *ResourceConsumer) sendConsumeCustomMetric(requests, delta, durationSec int) { - for i := 0; i < requests; i++ { - go rc.sendOneConsumeCustomMetric(delta, durationSec) - } -} - -// sendOneConsumeCPURequest sends POST request for cpu consumption -func (rc *ResourceConsumer) sendOneConsumeCPURequest(millicores int, durationSec int) { - defer GinkgoRecover() +func (rc *ResourceConsumer) sendConsumeCPURequest(millicores int) { proxyRequest, err := framework.GetServicesProxyRequest(rc.framework.Client, rc.framework.Client.Post()) framework.ExpectNoError(err) - _, err = proxyRequest.Namespace(rc.framework.Namespace.Name). - Name(rc.name). + req := proxyRequest.Namespace(rc.framework.Namespace.Name). + Name(rc.controllerName). Suffix("ConsumeCPU"). Param("millicores", strconv.Itoa(millicores)). - Param("durationSec", strconv.Itoa(durationSec)). - DoRaw() + Param("durationSec", strconv.Itoa(rc.consumptionTimeInSeconds)). + Param("requestSizeMillicores", strconv.Itoa(rc.requestSizeInMillicores)) + framework.Logf("URL: %v", *req.URL()) + _, err = req.DoRaw() framework.ExpectNoError(err) } -// sendOneConsumeMemRequest sends POST request for memory consumption -func (rc *ResourceConsumer) sendOneConsumeMemRequest(megabytes int, durationSec int) { - defer GinkgoRecover() +// sendConsumeMemRequest sends POST request for memory consumption +func (rc *ResourceConsumer) sendConsumeMemRequest(megabytes int) { proxyRequest, err := framework.GetServicesProxyRequest(rc.framework.Client, rc.framework.Client.Post()) framework.ExpectNoError(err) - _, err = proxyRequest.Namespace(rc.framework.Namespace.Name). - Name(rc.name). + req := proxyRequest.Namespace(rc.framework.Namespace.Name). + Name(rc.controllerName). Suffix("ConsumeMem"). Param("megabytes", strconv.Itoa(megabytes)). - Param("durationSec", strconv.Itoa(durationSec)). - DoRaw() + Param("durationSec", strconv.Itoa(rc.consumptionTimeInSeconds)). + Param("requestSizeMegabytes", strconv.Itoa(rc.requestSizeInMegabytes)) + framework.Logf("URL: %v", *req.URL()) + _, err = req.DoRaw() framework.ExpectNoError(err) } -// sendOneConsumeCustomMetric sends POST request for custom metric consumption -func (rc *ResourceConsumer) sendOneConsumeCustomMetric(delta int, durationSec int) { - defer GinkgoRecover() +// sendConsumeCustomMetric sends POST request for custom metric consumption +func (rc *ResourceConsumer) sendConsumeCustomMetric(delta int) { proxyRequest, err := framework.GetServicesProxyRequest(rc.framework.Client, rc.framework.Client.Post()) framework.ExpectNoError(err) - _, err = proxyRequest.Namespace(rc.framework.Namespace.Name). - Name(rc.name). + req := proxyRequest.Namespace(rc.framework.Namespace.Name). + Name(rc.controllerName). Suffix("BumpMetric"). Param("metric", customMetricName). Param("delta", strconv.Itoa(delta)). - Param("durationSec", strconv.Itoa(durationSec)). - DoRaw() + Param("durationSec", strconv.Itoa(rc.consumptionTimeInSeconds)). + Param("requestSizeMetrics", strconv.Itoa(rc.requestSizeCustomMetric)) + framework.Logf("URL: %v", *req.URL()) + _, err = req.DoRaw() framework.ExpectNoError(err) } @@ -346,6 +304,8 @@ func (rc *ResourceConsumer) CleanUp() { time.Sleep(10 * time.Second) framework.ExpectNoError(framework.DeleteRC(rc.framework.Client, rc.framework.Namespace.Name, rc.name)) framework.ExpectNoError(rc.framework.Client.Services(rc.framework.Namespace.Name).Delete(rc.name)) + framework.ExpectNoError(framework.DeleteRC(rc.framework.Client, rc.framework.Namespace.Name, rc.controllerName)) + framework.ExpectNoError(rc.framework.Client.Services(rc.framework.Namespace.Name).Delete(rc.controllerName)) } func runServiceAndWorkloadForResourceConsumer(c *client.Client, ns, name, kind string, replicas int, cpuLimitMillis, memLimitMb int64) { @@ -400,6 +360,38 @@ func runServiceAndWorkloadForResourceConsumer(c *client.Client, ns, name, kind s framework.Failf(invalidKind) } + By(fmt.Sprintf("Running controller")) + controllerName := name + "-controller" + _, err = c.Services(ns).Create(&api.Service{ + ObjectMeta: api.ObjectMeta{ + Name: controllerName, + }, + Spec: api.ServiceSpec{ + Ports: []api.ServicePort{{ + Port: port, + TargetPort: intstr.FromInt(targetPort), + }}, + + Selector: map[string]string{ + "name": controllerName, + }, + }, + }) + framework.ExpectNoError(err) + + dnsClusterFirst := api.DNSClusterFirst + controllerRcConfig := framework.RCConfig{ + Client: c, + Image: resourceConsumerControllerImage, + Name: controllerName, + Namespace: ns, + Timeout: timeoutRC, + Replicas: 1, + Command: []string{"/controller", "--consumer-service-name=" + name, "--consumer-service-namespace=" + ns, "--consumer-port=80"}, + DNSPolicy: &dnsClusterFirst, + } + framework.ExpectNoError(framework.RunRC(controllerRcConfig)) + // Make sure endpoints are propagated. // TODO(piosz): replace sleep with endpoints watch. time.Sleep(10 * time.Second) diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index 0f37c5b9da4..f970e7c3898 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -271,6 +271,7 @@ type RCConfig struct { MemRequest int64 // bytes MemLimit int64 // bytes ReadinessProbe *api.Probe + DNSPolicy *api.DNSPolicy // Env vars, set the same for every pod. Env map[string]string @@ -2181,6 +2182,10 @@ func RunRC(config RCConfig) error { func (config *RCConfig) create() error { By(fmt.Sprintf("creating replication controller %s in namespace %s", config.Name, config.Namespace)) + dnsDefault := api.DNSDefault + if config.DNSPolicy == nil { + config.DNSPolicy = &dnsDefault + } rc := &api.ReplicationController{ ObjectMeta: api.ObjectMeta{ Name: config.Name, @@ -2204,7 +2209,7 @@ func (config *RCConfig) create() error { ReadinessProbe: config.ReadinessProbe, }, }, - DNSPolicy: api.DNSDefault, + DNSPolicy: *config.DNSPolicy, }, }, }, diff --git a/test/images/resource-consumer/Makefile b/test/images/resource-consumer/Makefile index b0689166101..f117558cd55 100644 --- a/test/images/resource-consumer/Makefile +++ b/test/images/resource-consumer/Makefile @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -TAG = beta2 +TAG = beta4 PREFIX = gcr.io/google_containers all: clean consumer @@ -20,18 +20,22 @@ all: clean consumer consumer: CGO_ENABLED=0 go build -a -installsuffix cgo --ldflags '-w' -o consume-cpu/consume-cpu ./consume-cpu/consume_cpu.go CGO_ENABLED=0 go build -a -installsuffix cgo --ldflags '-w' -o consumer . + CGO_ENABLED=0 go build -a -installsuffix cgo --ldflags '-w' -o controller/controller ./controller/controller.go container: image image: sudo docker build -t $(PREFIX)/resource_consumer:$(TAG) . + sudo docker build -t $(PREFIX)/resource_consumer/controller:$(TAG) controller run_container: docker run --publish=8080:8080 $(PREFIX)/resource_consumer:$(TAG) push: - @echo "This image is not meant to be pushed." + gcloud docker push ${PREFIX}/resource_consumer:${TAG} + gcloud docker push ${PREFIX}/resource_consumer/controller:${TAG} clean: rm -f consumer rm -f consume-cpu/consume-cpu + rm -f controller/controller diff --git a/test/images/resource-consumer/common/common.go b/test/images/resource-consumer/common/common.go new file mode 100644 index 00000000000..cce32781ce6 --- /dev/null +++ b/test/images/resource-consumer/common/common.go @@ -0,0 +1,41 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package common + +const ( + ConsumeCPUAddress = "/ConsumeCPU" + ConsumeMemAddress = "/ConsumeMem" + BumpMetricAddress = "/BumpMetric" + GetCurrentStatusAddress = "/GetCurrentStatus" + MetricsAddress = "/Metrics" + + MillicoresQuery = "millicores" + MegabytesQuery = "megabytes" + MetricNameQuery = "metric" + DeltaQuery = "delta" + DurationSecQuery = "durationSec" + RequestSizeInMillicoresQuery = "requestSizeMillicores" + RequestSizeInMegabytesQuery = "requestSizeMegabytes" + RequestSizeCustomMetricQuery = "requestSizeMetrics" + + BadRequest = "Bad request. Not a POST request" + UnknownFunction = "unknown function" + IncorrectFunctionArgument = "incorrect function argument" + NotGivenFunctionArgument = "not given function argument" + + FrameworkName = "horizontal-pod-autoscaling" +) diff --git a/test/images/resource-consumer/controller/Dockerfile b/test/images/resource-consumer/controller/Dockerfile new file mode 100644 index 00000000000..e60734d15c5 --- /dev/null +++ b/test/images/resource-consumer/controller/Dockerfile @@ -0,0 +1,19 @@ +# Copyright 2016 The Kubernetes Authors All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM busybox +MAINTAINER Jerzy Szczepkowski +ADD controller /controller +EXPOSE 8080 +ENTRYPOINT ["/controller"] diff --git a/test/images/resource-consumer/controller/controller.go b/test/images/resource-consumer/controller/controller.go new file mode 100644 index 00000000000..404fb4e3195 --- /dev/null +++ b/test/images/resource-consumer/controller/controller.go @@ -0,0 +1,243 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "flag" + "fmt" + "log" + "net/http" + "net/url" + "strconv" + "sync" + + . "k8s.io/kubernetes/test/images/resource-consumer/common" +) + +var port = flag.Int("port", 8080, "Port number.") +var consumerPort = flag.Int("consumer-port", 8080, "Port number of consumers.") +var consumerServiceName = flag.String("consumer-service-name", "resource-consumer", "Name of service containing resource consumers.") +var consumerServiceNamespace = flag.String("consumer-service-namespace", "default", "Namespace of service containing resource consumers.") + +func main() { + flag.Parse() + mgr := NewController() + log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", *port), mgr)) +} + +type Controller struct { + responseWriterLock sync.Mutex + waitGroup sync.WaitGroup +} + +func NewController() *Controller { + c := &Controller{} + return c +} + +func (handler *Controller) ServeHTTP(w http.ResponseWriter, req *http.Request) { + if req.Method != "POST" { + http.Error(w, BadRequest, http.StatusBadRequest) + return + } + // parsing POST request data and URL data + if err := req.ParseForm(); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + // handle consumeCPU + if req.URL.Path == ConsumeCPUAddress { + handler.handleConsumeCPU(w, req.Form) + return + } + // handle consumeMem + if req.URL.Path == ConsumeMemAddress { + handler.handleConsumeMem(w, req.Form) + return + } + // handle bumpMetric + if req.URL.Path == BumpMetricAddress { + handler.handleBumpMetric(w, req.Form) + return + } + http.Error(w, UnknownFunction, http.StatusNotFound) +} + +func (handler *Controller) handleConsumeCPU(w http.ResponseWriter, query url.Values) { + // geting string data for consumeCPU + durationSecString := query.Get(DurationSecQuery) + millicoresString := query.Get(MillicoresQuery) + requestSizeInMillicoresString := query.Get(RequestSizeInMillicoresQuery) + if durationSecString == "" || millicoresString == "" || requestSizeInMillicoresString == "" { + http.Error(w, NotGivenFunctionArgument, http.StatusBadRequest) + return + } + + // convert data (strings to ints) for consumeCPU + durationSec, durationSecError := strconv.Atoi(durationSecString) + millicores, millicoresError := strconv.Atoi(millicoresString) + requestSizeInMillicores, requestSizeInMillicoresError := strconv.Atoi(requestSizeInMillicoresString) + if durationSecError != nil || millicoresError != nil || requestSizeInMillicoresError != nil || requestSizeInMillicores <= 0 { + http.Error(w, IncorrectFunctionArgument, http.StatusBadRequest) + return + } + + count := millicores / requestSizeInMillicores + rest := millicores - count*requestSizeInMillicores + fmt.Fprintf(w, "RC manager: sending %v requests to consume %v millicores each and 1 request to consume %v millicores\n", count, requestSizeInMillicores, rest) + if count > 0 { + handler.waitGroup.Add(count) + handler.sendConsumeCPURequests(w, count, requestSizeInMillicores, durationSec) + } + if rest > 0 { + handler.waitGroup.Add(1) + go handler.sendOneConsumeCPURequest(w, rest, durationSec) + } + handler.waitGroup.Wait() +} + +func (handler *Controller) handleConsumeMem(w http.ResponseWriter, query url.Values) { + // geting string data for consumeMem + durationSecString := query.Get(DurationSecQuery) + megabytesString := query.Get(MegabytesQuery) + requestSizeInMegabytesString := query.Get(RequestSizeInMegabytesQuery) + if durationSecString == "" || megabytesString == "" || requestSizeInMegabytesString == "" { + http.Error(w, NotGivenFunctionArgument, http.StatusBadRequest) + return + } + + // convert data (strings to ints) for consumeMem + durationSec, durationSecError := strconv.Atoi(durationSecString) + megabytes, megabytesError := strconv.Atoi(megabytesString) + requestSizeInMegabytes, requestSizeInMegabytesError := strconv.Atoi(requestSizeInMegabytesString) + if durationSecError != nil || megabytesError != nil || requestSizeInMegabytesError != nil || requestSizeInMegabytes <= 0 { + http.Error(w, IncorrectFunctionArgument, http.StatusBadRequest) + return + } + + count := megabytes / requestSizeInMegabytes + rest := megabytes - count*requestSizeInMegabytes + fmt.Fprintf(w, "RC manager: sending %v requests to consume %v MB each and 1 request to consume %v MB\n", count, requestSizeInMegabytes, rest) + if count > 0 { + handler.waitGroup.Add(count) + handler.sendConsumeMemRequests(w, count, requestSizeInMegabytes, durationSec) + } + if rest > 0 { + handler.waitGroup.Add(1) + go handler.sendOneConsumeMemRequest(w, rest, durationSec) + } + handler.waitGroup.Wait() +} + +func (handler *Controller) handleBumpMetric(w http.ResponseWriter, query url.Values) { + // geting string data for handleBumpMetric + metric := query.Get(MetricNameQuery) + deltaString := query.Get(DeltaQuery) + durationSecString := query.Get(DurationSecQuery) + requestSizeCustomMetricString := query.Get(RequestSizeCustomMetricQuery) + if durationSecString == "" || metric == "" || deltaString == "" || requestSizeCustomMetricString == "" { + http.Error(w, NotGivenFunctionArgument, http.StatusBadRequest) + return + } + + // convert data (strings to ints/floats) for handleBumpMetric + durationSec, durationSecError := strconv.Atoi(durationSecString) + delta, deltaError := strconv.Atoi(deltaString) + requestSizeCustomMetric, requestSizeCustomMetricError := strconv.Atoi(requestSizeCustomMetricString) + if durationSecError != nil || deltaError != nil || requestSizeCustomMetricError != nil || requestSizeCustomMetric <= 0 { + http.Error(w, IncorrectFunctionArgument, http.StatusBadRequest) + return + } + + count := delta / requestSizeCustomMetric + rest := delta - count*requestSizeCustomMetric + fmt.Fprintf(w, "RC manager: sending %v requests to bump custom metric by %v each and 1 request to bump by %v\n", count, requestSizeCustomMetric, rest) + if count > 0 { + handler.waitGroup.Add(count) + handler.sendConsumeCustomMetric(w, metric, count, requestSizeCustomMetric, durationSec) + } + if rest > 0 { + handler.waitGroup.Add(1) + go handler.sendOneConsumeCustomMetric(w, metric, rest, durationSec) + } + handler.waitGroup.Wait() +} + +func (manager *Controller) sendConsumeCPURequests(w http.ResponseWriter, requests, millicores, durationSec int) { + for i := 0; i < requests; i++ { + go manager.sendOneConsumeCPURequest(w, millicores, durationSec) + } +} + +func (manager *Controller) sendConsumeMemRequests(w http.ResponseWriter, requests, megabytes, durationSec int) { + for i := 0; i < requests; i++ { + go manager.sendOneConsumeMemRequest(w, megabytes, durationSec) + } +} + +func (manager *Controller) sendConsumeCustomMetric(w http.ResponseWriter, metric string, requests, delta, durationSec int) { + for i := 0; i < requests; i++ { + go manager.sendOneConsumeCustomMetric(w, metric, delta, durationSec) + } +} + +func createConsumerURL(suffix string) string { + return fmt.Sprintf("http://%s.%s.svc.cluster.local:%d%s", *consumerServiceName, *consumerServiceNamespace, *consumerPort, suffix) +} + +// sendOneConsumeCPURequest sends POST request for cpu consumption +func (c *Controller) sendOneConsumeCPURequest(w http.ResponseWriter, millicores int, durationSec int) { + defer c.waitGroup.Done() + query := createConsumerURL(ConsumeCPUAddress) + _, err := http.PostForm(query, url.Values{MillicoresQuery: {strconv.Itoa(millicores)}, DurationSecQuery: {strconv.Itoa(durationSec)}}) + c.responseWriterLock.Lock() + defer c.responseWriterLock.Unlock() + if err != nil { + fmt.Fprintf(w, "Failed to connect to consumer: %v\n", err) + return + } + fmt.Fprintf(w, "Consumed %d millicores\n", millicores) +} + +// sendOneConsumeMemRequest sends POST request for memory consumption +func (c *Controller) sendOneConsumeMemRequest(w http.ResponseWriter, megabytes int, durationSec int) { + defer c.waitGroup.Done() + query := createConsumerURL(ConsumeMemAddress) + _, err := http.PostForm(query, url.Values{MegabytesQuery: {strconv.Itoa(megabytes)}, DurationSecQuery: {strconv.Itoa(durationSec)}}) + c.responseWriterLock.Lock() + defer c.responseWriterLock.Unlock() + if err != nil { + fmt.Fprintf(w, "Failed to connect to consumer: %v\n", err) + return + } + fmt.Fprintf(w, "Consumed %d megabytes\n", megabytes) +} + +// sendOneConsumeCustomMetric sends POST request for custom metric consumption +func (c *Controller) sendOneConsumeCustomMetric(w http.ResponseWriter, customMetricName string, delta int, durationSec int) { + defer c.waitGroup.Done() + query := createConsumerURL(BumpMetricAddress) + _, err := http.PostForm(query, + url.Values{MetricNameQuery: {customMetricName}, DurationSecQuery: {strconv.Itoa(durationSec)}, DeltaQuery: {strconv.Itoa(delta)}}) + c.responseWriterLock.Lock() + defer c.responseWriterLock.Unlock() + if err != nil { + fmt.Fprintf(w, "Failed to connect to consumer: %v\n", err) + return + } + fmt.Fprintf(w, "Bumped metric %s by %d\n", customMetricName, delta) +} diff --git a/test/images/resource-consumer/resource_consumer_handler.go b/test/images/resource-consumer/resource_consumer_handler.go index 1459d0423fd..4960b6a5f74 100644 --- a/test/images/resource-consumer/resource_consumer_handler.go +++ b/test/images/resource-consumer/resource_consumer_handler.go @@ -23,23 +23,8 @@ import ( "strconv" "sync" "time" -) -const ( - badRequest = "Bad request. Not a POST request" - unknownFunction = "unknown function" - incorrectFunctionArgument = "incorrect function argument" - notGivenFunctionArgument = "not given function argument" - consumeCPUAddress = "/ConsumeCPU" - consumeMemAddress = "/ConsumeMem" - bumpMetricAddress = "/BumpMetric" - getCurrentStatusAddress = "/GetCurrentStatus" - metricsAddress = "/metrics" - millicoresQuery = "millicores" - megabytesQuery = "megabytes" - metricNameQuery = "metric" - deltaQuery = "delta" - durationSecQuery = "durationSec" + . "k8s.io/kubernetes/test/images/resource-consumer/common" ) type ResourceConsumerHandler struct { @@ -53,12 +38,12 @@ func NewResourceConsumerHandler() *ResourceConsumerHandler { func (handler *ResourceConsumerHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { // handle exposing metrics in Prometheus format (both GET & POST) - if req.URL.Path == metricsAddress { + if req.URL.Path == MetricsAddress { handler.handleMetrics(w) return } if req.Method != "POST" { - http.Error(w, badRequest, http.StatusBadRequest) + http.Error(w, BadRequest, http.StatusBadRequest) return } // parsing POST request data and URL data @@ -67,34 +52,34 @@ func (handler *ResourceConsumerHandler) ServeHTTP(w http.ResponseWriter, req *ht return } // handle consumeCPU - if req.URL.Path == consumeCPUAddress { + if req.URL.Path == ConsumeCPUAddress { handler.handleConsumeCPU(w, req.Form) return } // handle consumeMem - if req.URL.Path == consumeMemAddress { + if req.URL.Path == ConsumeMemAddress { handler.handleConsumeMem(w, req.Form) return } // handle getCurrentStatus - if req.URL.Path == getCurrentStatusAddress { + if req.URL.Path == GetCurrentStatusAddress { handler.handleGetCurrentStatus(w) return } // handle bumpMetric - if req.URL.Path == bumpMetricAddress { + if req.URL.Path == BumpMetricAddress { handler.handleBumpMetric(w, req.Form) return } - http.Error(w, unknownFunction, http.StatusNotFound) + http.Error(w, fmt.Sprintf("%s: %s", UnknownFunction, req.URL.Path), http.StatusNotFound) } func (handler *ResourceConsumerHandler) handleConsumeCPU(w http.ResponseWriter, query url.Values) { // geting string data for consumeCPU - durationSecString := query.Get(durationSecQuery) - millicoresString := query.Get(millicoresQuery) + durationSecString := query.Get(DurationSecQuery) + millicoresString := query.Get(MillicoresQuery) if durationSecString == "" || millicoresString == "" { - http.Error(w, notGivenFunctionArgument, http.StatusBadRequest) + http.Error(w, NotGivenFunctionArgument, http.StatusBadRequest) return } @@ -102,22 +87,22 @@ func (handler *ResourceConsumerHandler) handleConsumeCPU(w http.ResponseWriter, durationSec, durationSecError := strconv.Atoi(durationSecString) millicores, millicoresError := strconv.Atoi(millicoresString) if durationSecError != nil || millicoresError != nil { - http.Error(w, incorrectFunctionArgument, http.StatusBadRequest) + http.Error(w, IncorrectFunctionArgument, http.StatusBadRequest) return } go ConsumeCPU(millicores, durationSec) - fmt.Fprintln(w, consumeCPUAddress[1:]) - fmt.Fprintln(w, millicores, millicoresQuery) - fmt.Fprintln(w, durationSec, durationSecQuery) + fmt.Fprintln(w, ConsumeCPUAddress[1:]) + fmt.Fprintln(w, millicores, MillicoresQuery) + fmt.Fprintln(w, durationSec, DurationSecQuery) } func (handler *ResourceConsumerHandler) handleConsumeMem(w http.ResponseWriter, query url.Values) { // geting string data for consumeMem - durationSecString := query.Get(durationSecQuery) - megabytesString := query.Get(megabytesQuery) + durationSecString := query.Get(DurationSecQuery) + megabytesString := query.Get(MegabytesQuery) if durationSecString == "" || megabytesString == "" { - http.Error(w, notGivenFunctionArgument, http.StatusBadRequest) + http.Error(w, NotGivenFunctionArgument, http.StatusBadRequest) return } @@ -125,20 +110,20 @@ func (handler *ResourceConsumerHandler) handleConsumeMem(w http.ResponseWriter, durationSec, durationSecError := strconv.Atoi(durationSecString) megabytes, megabytesError := strconv.Atoi(megabytesString) if durationSecError != nil || megabytesError != nil { - http.Error(w, incorrectFunctionArgument, http.StatusBadRequest) + http.Error(w, IncorrectFunctionArgument, http.StatusBadRequest) return } go ConsumeMem(megabytes, durationSec) - fmt.Fprintln(w, consumeMemAddress[1:]) - fmt.Fprintln(w, megabytes, megabytesQuery) - fmt.Fprintln(w, durationSec, durationSecQuery) + fmt.Fprintln(w, ConsumeMemAddress[1:]) + fmt.Fprintln(w, megabytes, MegabytesQuery) + fmt.Fprintln(w, durationSec, DurationSecQuery) } func (handler *ResourceConsumerHandler) handleGetCurrentStatus(w http.ResponseWriter) { GetCurrentStatus() fmt.Fprintln(w, "Warning: not implemented!") - fmt.Fprint(w, getCurrentStatusAddress[1:]) + fmt.Fprint(w, GetCurrentStatusAddress[1:]) } func (handler *ResourceConsumerHandler) handleMetrics(w http.ResponseWriter) { @@ -169,11 +154,11 @@ func (handler *ResourceConsumerHandler) bumpMetric(metric string, delta float64, func (handler *ResourceConsumerHandler) handleBumpMetric(w http.ResponseWriter, query url.Values) { // geting string data for handleBumpMetric - metric := query.Get(metricNameQuery) - deltaString := query.Get(deltaQuery) - durationSecString := query.Get(durationSecQuery) + metric := query.Get(MetricNameQuery) + deltaString := query.Get(DeltaQuery) + durationSecString := query.Get(DurationSecQuery) if durationSecString == "" || metric == "" || deltaString == "" { - http.Error(w, notGivenFunctionArgument, http.StatusBadRequest) + http.Error(w, NotGivenFunctionArgument, http.StatusBadRequest) return } @@ -181,13 +166,13 @@ func (handler *ResourceConsumerHandler) handleBumpMetric(w http.ResponseWriter, durationSec, durationSecError := strconv.Atoi(durationSecString) delta, deltaError := strconv.ParseFloat(deltaString, 64) if durationSecError != nil || deltaError != nil { - http.Error(w, incorrectFunctionArgument, http.StatusBadRequest) + http.Error(w, IncorrectFunctionArgument, http.StatusBadRequest) return } go handler.bumpMetric(metric, delta, time.Duration(durationSec)*time.Second) - fmt.Fprintln(w, bumpMetricAddress[1:]) - fmt.Fprintln(w, metric, metricNameQuery) - fmt.Fprintln(w, delta, deltaQuery) - fmt.Fprintln(w, durationSec, durationSecQuery) + fmt.Fprintln(w, BumpMetricAddress[1:]) + fmt.Fprintln(w, metric, MetricNameQuery) + fmt.Fprintln(w, delta, DeltaQuery) + fmt.Fprintln(w, durationSec, DurationSecQuery) }