diff --git a/pkg/api/v1/conversion.go b/pkg/api/v1/conversion.go index 1549972a6eb..4fb95230eb9 100644 --- a/pkg/api/v1/conversion.go +++ b/pkg/api/v1/conversion.go @@ -141,6 +141,19 @@ func addConversionFuncs() { // If one of the conversion functions is malformed, detect it immediately. panic(err) } + err = api.Scheme.AddFieldLabelConversionFunc("v1beta3", "Endpoints", + func(label, value string) (string, string, error) { + switch label { + case "metadata.name": + return label, value, nil + default: + return "", "", fmt.Errorf("field label not supported: %s", label) + } + }) + if err != nil { + // If one of the conversion functions is malformed, detect it immediately. + panic(err) + } } func convert_api_ReplicationControllerSpec_To_v1_ReplicationControllerSpec(in *api.ReplicationControllerSpec, out *ReplicationControllerSpec, s conversion.Scope) error { diff --git a/pkg/api/v1beta3/conversion.go b/pkg/api/v1beta3/conversion.go index baa9d907f8c..f0f779fed0c 100644 --- a/pkg/api/v1beta3/conversion.go +++ b/pkg/api/v1beta3/conversion.go @@ -153,6 +153,19 @@ func addConversionFuncs() { // If one of the conversion functions is malformed, detect it immediately. panic(err) } + err = api.Scheme.AddFieldLabelConversionFunc("v1beta3", "Endpoints", + func(label, value string) (string, string, error) { + switch label { + case "metadata.name": + return label, value, nil + default: + return "", "", fmt.Errorf("field label not supported: %s", label) + } + }) + if err != nil { + // If one of the conversion functions is malformed, detect it immediately. + panic(err) + } } func convert_v1beta3_Container_To_api_Container(in *Container, out *api.Container, s conversion.Scope) error { diff --git a/pkg/runtime/scheme.go b/pkg/runtime/scheme.go index a5c5b50b304..e05d827092d 100644 --- a/pkg/runtime/scheme.go +++ b/pkg/runtime/scheme.go @@ -376,11 +376,11 @@ func (s *Scheme) Convert(in, out interface{}) error { // versioned representation to an unversioned one. func (s *Scheme) ConvertFieldLabel(version, kind, label, value string) (string, string, error) { if s.fieldLabelConversionFuncs[version] == nil { - return "", "", fmt.Errorf("No conversion function found for version: %s", version) + return "", "", fmt.Errorf("No field label conversion function found for version: %s", version) } conversionFunc, ok := s.fieldLabelConversionFuncs[version][kind] if !ok { - return "", "", fmt.Errorf("No conversion function found for version %s and kind %s", version, kind) + return "", "", fmt.Errorf("No field label conversion function found for version %s and kind %s", version, kind) } return conversionFunc(label, value) } diff --git a/test/e2e/service_latency.go b/test/e2e/service_latency.go new file mode 100644 index 00000000000..fce68458a7e --- /dev/null +++ b/test/e2e/service_latency.go @@ -0,0 +1,181 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "fmt" + "sort" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +type durations []time.Duration + +func (d durations) Len() int { return len(d) } +func (d durations) Less(i, j int) bool { return d[i] < d[j] } +func (d durations) Swap(i, j int) { d[i], d[j] = d[j], d[i] } + +var _ = Describe("Service endpoints latency", func() { + f := NewFramework("svc-latency") + + It("should not be very high", func() { + nodes, err := f.Client.Nodes().List(labels.Everything(), fields.Everything()) + if err != nil { + Failf("Failed to list nodes: %v", err) + } + + count := len(nodes.Items) + d, err := runServiceLatencies(f, 8*count, 20*count) + Expect(err).NotTo(HaveOccurred()) + dSorted := durations(d) + sort.Sort(dSorted) + n := len(dSorted) + if n < 5 { + Expect(fmt.Errorf("Did not get a good enough sample size: %v", dSorted)).NotTo(HaveOccurred()) + return + } + percentile := func(p int) time.Duration { + est := n * p / 100 + if est >= n { + return dSorted[n-1] + } + return dSorted[est] + } + Logf("Latencies: %v", dSorted) + p50 := percentile(50) + p90 := percentile(90) + p99 := percentile(99) + Logf("50 %%ile: %v", p50) + Logf("90 %%ile: %v", p90) + Logf("99 %%ile: %v", p99) + + if p99 > 4*p50 { + Fail("Tail latency is > 4x median latency") + } + + if p50 > time.Second*20 { + Fail("Median latency should be less than 20 seconds") + } + }) +}) + +func runServiceLatencies(f *Framework, inParallel, total int) (output []time.Duration, err error) { + next := make(chan int, total) + go func() { + for i := 0; i < total; i++ { + next <- i + } + close(next) + }() + + errs := make(chan error, total) + durations := make(chan time.Duration, total) + + for i := 0; i < inParallel; i++ { + go func() { + defer GinkgoRecover() + for { + i, ok := <-next + if !ok { + return + } + if d, err := singleServiceLatency(f, i); err != nil { + errs <- err + } else { + durations <- d + } + } + }() + } + + errCount := 0 + for i := 0; i < total; i++ { + select { + case e := <-errs: + Expect(e).NotTo(HaveOccurred()) + errCount += 1 + case d := <-durations: + output = append(output, d) + } + } + if errCount != 0 { + return output, fmt.Errorf("got %v errors", errCount) + } + return output, nil +} + +func singleServiceLatency(f *Framework, i int) (time.Duration, error) { + // Make an RC with a single pod. + cfg := RCConfig{ + Client: f.Client, + Image: "gcr.io/google_containers/pause:1.0", + Name: fmt.Sprintf("trial-%v", i), + Namespace: f.Namespace.Name, + Replicas: 1, + PollInterval: 10, + } + if err := RunRC(cfg); err != nil { + return 0, err + } + defer DeleteRC(f.Client, f.Namespace.Name, cfg.Name) + + // Now make a service that points to that pod. + svc := &api.Service{ + ObjectMeta: api.ObjectMeta{ + Name: cfg.Name, + }, + Spec: api.ServiceSpec{ + Ports: []api.ServicePort{{Protocol: api.ProtocolTCP, Port: 80}}, + Selector: map[string]string{"name": cfg.Name}, + Type: api.ServiceTypeClusterIP, + SessionAffinity: api.ServiceAffinityNone, + }, + } + gotSvc, err := f.Client.Services(f.Namespace.Name).Create(svc) + if err != nil { + return 0, err + } + + // Now time how long it takes for the endpoints to show up. + startTime := time.Now() + defer f.Client.Services(f.Namespace.Name).Delete(gotSvc.Name) + w, err := f.Client.Endpoints(f.Namespace.Name).Watch(labels.Everything(), fields.Set{"metadata.name": cfg.Name}.AsSelector(), gotSvc.ResourceVersion) + if err != nil { + return 0, err + } + defer w.Stop() + + for { + val, ok := <-w.ResultChan() + if !ok { + return 0, fmt.Errorf("watch closed") + } + if e, ok := val.Object.(*api.Endpoints); ok { + if e.Name == cfg.Name && len(e.Subsets) > 0 && len(e.Subsets[0].Addresses) > 0 { + stopTime := time.Now() + return stopTime.Sub(startTime), nil + } + } + } + +}