From 77da96c7f6891e5ad34fced0d310e2569bc89839 Mon Sep 17 00:00:00 2001 From: Satyadeep Musuvathy Date: Wed, 28 Feb 2018 16:25:42 -0800 Subject: [PATCH] Performance tests and fix for IPAM controller. Tests the four modes of allocations. Can be run using ./test-performance.sh under tests/integration/ipamperf directory. See ./test-performance.sh -h for supported flags. --- pkg/cloudprovider/providers/gce/support.go | 11 + test/integration/BUILD | 1 + test/integration/ipamperf/BUILD | 70 ++++++ test/integration/ipamperf/README.md | 64 +++++ test/integration/ipamperf/cloud.go | 154 +++++++++++++ test/integration/ipamperf/ipam_test.go | 150 ++++++++++++ test/integration/ipamperf/main_test.go | 32 +++ test/integration/ipamperf/results.go | 218 ++++++++++++++++++ test/integration/ipamperf/test-performance.sh | 69 ++++++ test/integration/ipamperf/util.go | 80 +++++++ test/integration/util/BUILD | 4 + test/integration/util/cloud.go | 72 ++++++ 12 files changed, 925 insertions(+) create mode 100644 test/integration/ipamperf/BUILD create mode 100644 test/integration/ipamperf/README.md create mode 100644 test/integration/ipamperf/cloud.go create mode 100644 test/integration/ipamperf/ipam_test.go create mode 100644 test/integration/ipamperf/main_test.go create mode 100644 test/integration/ipamperf/results.go create mode 100755 test/integration/ipamperf/test-performance.sh create mode 100644 test/integration/ipamperf/util.go create mode 100644 test/integration/util/cloud.go diff --git a/pkg/cloudprovider/providers/gce/support.go b/pkg/cloudprovider/providers/gce/support.go index 42903af4579..37dc75b0b0a 100644 --- a/pkg/cloudprovider/providers/gce/support.go +++ b/pkg/cloudprovider/providers/gce/support.go @@ -64,3 +64,14 @@ func (l *gceRateLimiter) Accept(ctx context.Context, key *cloud.RateLimitKey) er } return nil } + +// CreateGCECloudWithCloud is a helper function to create an instance of GCECloud with the +// given Cloud interface implementation. Typical usage is to use cloud.NewMockGCE to get a +// handle to a mock Cloud instance and then use that for testing. +func CreateGCECloudWithCloud(config *CloudConfig, c cloud.Cloud) (*GCECloud, error) { + gceCloud, err := CreateGCECloud(config) + if err == nil { + gceCloud.c = c + } + return gceCloud, err +} diff --git a/test/integration/BUILD b/test/integration/BUILD index c426000b868..67c3f1bfcf9 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -48,6 +48,7 @@ filegroup( "//test/integration/examples:all-srcs", "//test/integration/framework:all-srcs", "//test/integration/garbagecollector:all-srcs", + "//test/integration/ipamperf:all-srcs", "//test/integration/master:all-srcs", "//test/integration/metrics:all-srcs", "//test/integration/objectmeta:all-srcs", diff --git a/test/integration/ipamperf/BUILD b/test/integration/ipamperf/BUILD new file mode 100644 index 00000000000..ea3a8bd287f --- /dev/null +++ b/test/integration/ipamperf/BUILD @@ -0,0 +1,70 @@ +package(default_visibility = ["//visibility:public"]) + +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_test( + name = "go_default_test", + size = "large", + srcs = [ + "ipam_test.go", + "main_test.go", + ], + embed = [":go_default_library"], + tags = ["integration"], + deps = [ + "//pkg/api/testapi:go_default_library", + "//pkg/controller/nodeipam:go_default_library", + "//pkg/controller/nodeipam/ipam:go_default_library", + "//test/integration/framework:go_default_library", + "//test/integration/util:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/client-go/informers:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", + "//vendor/k8s.io/client-go/rest:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) + +go_library( + name = "go_default_library", + srcs = [ + "cloud.go", + "results.go", + "util.go", + ], + importpath = "k8s.io/kubernetes/test/integration/ipamperf", + deps = [ + "//pkg/api/testapi:go_default_library", + "//pkg/cloudprovider:go_default_library", + "//pkg/cloudprovider/providers/gce/cloud:go_default_library", + "//pkg/cloudprovider/providers/gce/cloud/meta:go_default_library", + "//pkg/controller/nodeipam/ipam:go_default_library", + "//pkg/controller/nodeipam/ipam/cidrset:go_default_library", + "//pkg/controller/util/node:go_default_library", + "//test/integration/util:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/google.golang.org/api/compute/v0.beta:go_default_library", + "//vendor/google.golang.org/api/compute/v1:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/client-go/informers:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", + "//vendor/k8s.io/client-go/rest:go_default_library", + "//vendor/k8s.io/client-go/tools/cache:go_default_library", + ], +) diff --git a/test/integration/ipamperf/README.md b/test/integration/ipamperf/README.md new file mode 100644 index 00000000000..4c1e78720ac --- /dev/null +++ b/test/integration/ipamperf/README.md @@ -0,0 +1,64 @@ +IPAM Performance Test +===== + +Motivation +----- +We wanted to be able to test the behavior of the IPAM controller's under various scenarios, +by mocking and monitoring the edges that the controller interacts with. This has the following goals: + +- Save time on testing +- To simulate various behaviors cheaply +- To observe and model the ideal behavior of the IPAM controller code + +Currently the test runs through the 4 different IPAM controller modes for cases where the kube API QPS is a) +equal to and b) significantly less than the number of nodes being added to observe and quantify behavior. + +How to run +------- + +```shell +# In kubernetes root path +make generated_files + +cd test/integration/ipamperf +./test-performance.sh +``` + +The runner scripts support a few different options: + +```shell +./test-performance.sh -h +usage: ./test-performance.sh [-h] [-d] [-r ] [-o ] + -h display this help message + -d enable debug logs in tests + -r regex pattern to match for tests + -o file to write JSON formatted results to +``` + +The tests follow the pattern TestPerformance/{AllocatorType}-KubeQPS{X}-Nodes{Y}, where AllocatorType +is one of + +- RangeAllocator +- IPAMFromCluster +- CloudAllocator +- IPAMFromCloud + +and X represents the QPS configured for the kubernetes API client, and Y is the number of nodes to create. + +The -d flags set the -v level for glog to 6, enabling nearly all of the debug logs in the code. + +So to run the test for CloudAllocator with 10 nodes, one can run + +```shell +./test-performance.sh -r /CloudAllocator.*Nodes10$ +``` + +At the end of the test, a JSON format of the results for all the tests run is printed. Passing the -o option +allows for also saving this JSON to a named file. + +Code Organization +----- +The core of the tests are defined in [ipam_test.go](ipam_test.go), using the t.Run() helper to control parallelism +as we want to able to start the master once. [cloud.go](cloud.go) contains the mock of the cloud server endpoint +and can be configured to behave differently as needed by the various modes. The tracking of the node behavior and +creation of the test results data is in [results.go](results.go). diff --git a/test/integration/ipamperf/cloud.go b/test/integration/ipamperf/cloud.go new file mode 100644 index 00000000000..c7cb10ec10f --- /dev/null +++ b/test/integration/ipamperf/cloud.go @@ -0,0 +1,154 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipamperf + +import ( + "context" + "net" + "sync" + + beta "google.golang.org/api/compute/v0.beta" + ga "google.golang.org/api/compute/v1" + "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud" + "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/meta" + "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset" + "k8s.io/kubernetes/test/integration/util" +) + +// implemntation note: +// ------------------ +// cloud.go implements hooks and handler functions for the MockGCE cloud in order to meet expectations +// of cloud behavior from the IPAM controllers. The key constraint is that the IPAM code is spread +// across both GA and Beta instances, which are distinct objects in the mock. We need to solve for +// +// 1. When a GET is called on an instance, we lazy create the instance with or without an assigned +// ip alias as needed by the IPAM controller type +// 2. When we assign an IP alias for an instance, both the GA and Beta instance have to agree on the +// assigned alias range +// +// We solve both the problems by using a baseInstanceList which maintains a list of known instances, +// and their pre-assigned ip-alias ranges (if needed). We then create GetHook for GA and Beta GetInstance +// calls as closures over this betaInstanceList that can lookup base instance data. +// +// This has the advantage that once the Get hook pouplates the GCEMock with the base data, we then let the +// rest of the mock code run as is. + +// baseInstance tracks basic instance data needed by the IPAM controllers +type baseInstance struct { + name string + zone string + aliasRange string +} + +// baseInstanceList tracks a set of base instances +type baseInstanceList struct { + allocateCIDR bool + clusterCIDR *net.IPNet + subnetMaskSize int + cidrSet *cidrset.CidrSet + + lock sync.Mutex // protect access to instances + instances map[meta.Key]*baseInstance +} + +// toGA is an utility method to return the baseInstance data as a GA Instance object +func (bi *baseInstance) toGA() *ga.Instance { + inst := &ga.Instance{Name: bi.name, Zone: bi.zone, NetworkInterfaces: []*ga.NetworkInterface{{}}} + if bi.aliasRange != "" { + inst.NetworkInterfaces[0].AliasIpRanges = []*ga.AliasIpRange{ + {IpCidrRange: bi.aliasRange, SubnetworkRangeName: util.TestSecondaryRangeName}, + } + } + return inst +} + +// toGA is an utility method to return the baseInstance data as a beta Instance object +func (bi *baseInstance) toBeta() *beta.Instance { + inst := &beta.Instance{Name: bi.name, Zone: bi.zone, NetworkInterfaces: []*beta.NetworkInterface{{}}} + if bi.aliasRange != "" { + inst.NetworkInterfaces[0].AliasIpRanges = []*beta.AliasIpRange{ + {IpCidrRange: bi.aliasRange, SubnetworkRangeName: util.TestSecondaryRangeName}, + } + } + return inst +} + +// newBaseInstanceList is the baseInstanceList constructor +func newBaseInstanceList(allocateCIDR bool, clusterCIDR *net.IPNet, subnetMaskSize int) *baseInstanceList { + cidrSet, _ := cidrset.NewCIDRSet(clusterCIDR, subnetMaskSize) + return &baseInstanceList{ + allocateCIDR: allocateCIDR, + clusterCIDR: clusterCIDR, + subnetMaskSize: subnetMaskSize, + cidrSet: cidrSet, + instances: make(map[meta.Key]*baseInstance), + } +} + +// getOrCreateBaseInstance lazily creates a new base instance, assigning if allocateCIDR is true +func (bil *baseInstanceList) getOrCreateBaseInstance(key *meta.Key) *baseInstance { + bil.lock.Lock() + defer bil.lock.Unlock() + + inst, found := bil.instances[*key] + if !found { + inst = &baseInstance{name: key.Name, zone: key.Zone} + if bil.allocateCIDR { + nextRange, _ := bil.cidrSet.AllocateNext() + inst.aliasRange = nextRange.String() + } + bil.instances[*key] = inst + } + return inst +} + +// newGAGetHook creates a new closure with the current baseInstanceList to be used as a MockInstances.GetHook +func (bil *baseInstanceList) newGAGetHook() func(ctx context.Context, key *meta.Key, m *cloud.MockInstances) (bool, *ga.Instance, error) { + return func(ctx context.Context, key *meta.Key, m *cloud.MockInstances) (bool, *ga.Instance, error) { + m.Lock.Lock() + defer m.Lock.Unlock() + + if _, found := m.Objects[*key]; !found { + m.Objects[*key] = &cloud.MockInstancesObj{Obj: bil.getOrCreateBaseInstance(key).toGA()} + } + return false, nil, nil + } +} + +// newBetaGetHook creates a new closure with the current baseInstanceList to be used as a MockBetaInstances.GetHook +func (bil *baseInstanceList) newBetaGetHook() func(ctx context.Context, key *meta.Key, m *cloud.MockBetaInstances) (bool, *beta.Instance, error) { + return func(ctx context.Context, key *meta.Key, m *cloud.MockBetaInstances) (bool, *beta.Instance, error) { + m.Lock.Lock() + defer m.Lock.Unlock() + + if _, found := m.Objects[*key]; !found { + m.Objects[*key] = &cloud.MockInstancesObj{Obj: bil.getOrCreateBaseInstance(key).toBeta()} + } + return false, nil, nil + } +} + +// newMockCloud returns a mock GCE instance with the appropriate handlers hooks +func (bil *baseInstanceList) newMockCloud() cloud.Cloud { + c := cloud.NewMockGCE(nil) + + // insert hooks to lazy create a instance when needed + c.MockInstances.GetHook = bil.newGAGetHook() + c.MockBetaInstances.GetHook = bil.newBetaGetHook() + + return c +} diff --git a/test/integration/ipamperf/ipam_test.go b/test/integration/ipamperf/ipam_test.go new file mode 100644 index 00000000000..1b679323b7e --- /dev/null +++ b/test/integration/ipamperf/ipam_test.go @@ -0,0 +1,150 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipamperf + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net" + "os" + "testing" + "time" + + "github.com/golang/glog" + + "k8s.io/api/core/v1" + "k8s.io/client-go/informers" + clientset "k8s.io/client-go/kubernetes" + restclient "k8s.io/client-go/rest" + "k8s.io/kubernetes/pkg/api/testapi" + "k8s.io/kubernetes/pkg/controller/nodeipam" + "k8s.io/kubernetes/pkg/controller/nodeipam/ipam" + "k8s.io/kubernetes/test/integration/util" +) + +func setupAllocator(apiURL string, config *Config, clusterCIDR, serviceCIDR *net.IPNet, subnetMaskSize int) (*clientset.Clientset, util.ShutdownFunc, error) { + controllerStopChan := make(chan struct{}) + shutdownFunc := func() { + close(controllerStopChan) + } + + clientSet := clientset.NewForConfigOrDie(&restclient.Config{ + Host: apiURL, + ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Groups[v1.GroupName].GroupVersion()}, + QPS: float32(config.KubeQPS), + Burst: config.KubeQPS, + }) + + sharedInformer := informers.NewSharedInformerFactory(clientSet, 1*time.Hour) + ipamController, err := nodeipam.NewNodeIpamController( + sharedInformer.Core().V1().Nodes(), config.Cloud, clientSet, + clusterCIDR, serviceCIDR, subnetMaskSize, true, config.AllocatorType, + ) + if err != nil { + return nil, shutdownFunc, err + } + go ipamController.Run(controllerStopChan) + sharedInformer.Start(controllerStopChan) + + return clientSet, shutdownFunc, nil +} + +func runTest(t *testing.T, apiURL string, config *Config, clusterCIDR, serviceCIDR *net.IPNet, subnetMaskSize int) (*Results, error) { + t.Helper() + glog.Infof("Running test %s", t.Name()) + + defer deleteNodes(apiURL, config) // cleanup nodes on after controller shutdown + + clientSet, shutdownFunc, err := setupAllocator(apiURL, config, clusterCIDR, serviceCIDR, subnetMaskSize) + if err != nil { + t.Fatalf("Error starting IPAM allocator: %v", err) + } + defer shutdownFunc() + + o := NewObserver(clientSet, config.NumNodes) + if err := o.StartObserving(); err != nil { + t.Fatalf("Could not start test observer: %v", err) + } + + if err := createNodes(apiURL, config); err != nil { + t.Fatalf("Could not create nodes: %v", err) + } + + results := o.Results(t.Name(), config) + glog.Infof("Results: %s", results) + if !results.Succeeded { + t.Errorf("%s: Not allocations succeeded", t.Name()) + } + return results, nil +} + +func logResults(allResults []*Results) { + jStr, err := json.MarshalIndent(allResults, "", " ") + if err != nil { + glog.Errorf("Error formating results: %v", err) + return + } + if resultsLogFile != "" { + glog.Infof("Logging results to %s", resultsLogFile) + if err := ioutil.WriteFile(resultsLogFile, jStr, os.FileMode(0644)); err != nil { + glog.Errorf("Error logging results to %s: %v", resultsLogFile, err) + } + } + glog.Infof("AllResults:\n%s", string(jStr)) +} + +func TestPerformance(t *testing.T) { + apiURL, masterShutdown := util.StartApiserver() + defer masterShutdown() + + _, clusterCIDR, _ := net.ParseCIDR("10.96.0.0/11") // allows up to 8K nodes + _, serviceCIDR, _ := net.ParseCIDR("10.94.0.0/24") // does not matter for test - pick upto 250 services + subnetMaskSize := 24 + + var ( + allResults []*Results + tests []*Config + ) + + for _, numNodes := range []int{10, 100} { + for _, alloc := range []ipam.CIDRAllocatorType{ipam.RangeAllocatorType, ipam.CloudAllocatorType, ipam.IPAMFromClusterAllocatorType, ipam.IPAMFromCloudAllocatorType} { + tests = append(tests, &Config{AllocatorType: alloc, NumNodes: numNodes, CreateQPS: numNodes, KubeQPS: 10, CloudQPS: 10}) + } + } + + for _, test := range tests { + testName := fmt.Sprintf("%s-KubeQPS%d-Nodes%d", test.AllocatorType, test.KubeQPS, test.NumNodes) + t.Run(testName, func(t *testing.T) { + allocateCIDR := false + if test.AllocatorType == ipam.IPAMFromCloudAllocatorType || test.AllocatorType == ipam.CloudAllocatorType { + allocateCIDR = true + } + bil := newBaseInstanceList(allocateCIDR, clusterCIDR, subnetMaskSize) + cloud, err := util.NewMockGCECloud(bil.newMockCloud()) + if err != nil { + t.Fatalf("Unable to create mock cloud: %v", err) + } + test.Cloud = cloud + if results, err := runTest(t, apiURL, test, clusterCIDR, serviceCIDR, subnetMaskSize); err == nil { + allResults = append(allResults, results) + } + }) + } + + logResults(allResults) +} diff --git a/test/integration/ipamperf/main_test.go b/test/integration/ipamperf/main_test.go new file mode 100644 index 00000000000..6ab7fb68aca --- /dev/null +++ b/test/integration/ipamperf/main_test.go @@ -0,0 +1,32 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipamperf + +import ( + "flag" + "testing" + + "k8s.io/kubernetes/test/integration/framework" +) + +var resultsLogFile string + +func TestMain(m *testing.M) { + flag.StringVar(&resultsLogFile, "log", "", "log file to write JSON results to") + flag.Parse() + framework.EtcdMain(m.Run) +} diff --git a/test/integration/ipamperf/results.go b/test/integration/ipamperf/results.go new file mode 100644 index 00000000000..26452354e8b --- /dev/null +++ b/test/integration/ipamperf/results.go @@ -0,0 +1,218 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipamperf + +import ( + "bytes" + "fmt" + "sort" + "sync" + "time" + + "github.com/golang/glog" + "k8s.io/api/core/v1" + "k8s.io/client-go/informers" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/kubernetes/pkg/cloudprovider" + "k8s.io/kubernetes/pkg/controller/nodeipam/ipam" + nodeutil "k8s.io/kubernetes/pkg/controller/util/node" +) + +// Config represents the test configuration that is being run +type Config struct { + CreateQPS int // rate at which nodes are created + KubeQPS int // rate for communication with kubernetes API + CloudQPS int // rate for communication with cloud endpoint + NumNodes int // number of nodes to created and monitored + AllocatorType ipam.CIDRAllocatorType // type of allocator to run + Cloud cloudprovider.Interface // cloud provider +} + +type nodeTime struct { + added time.Time // observed time for when node was added + allocated time.Time // observed time for when node was assigned podCIDR + podCIDR string // the allocated podCIDR range +} + +// Observer represents the handle to test observer that watches for node changes +// and tracks behavior +type Observer struct { + numAdded int // number of nodes observed added + numAllocated int // number of nodes observed allocated podCIDR + timing map[string]*nodeTime // per node timing + numNodes int // the number of nodes to expect + stopChan chan struct{} // for the shared informer + wg sync.WaitGroup + clientSet *clientset.Clientset +} + +// JSONDuration is an alias of time.Duration to support custom Marshal code +type JSONDuration time.Duration + +// NodeDuration represents the CIDR allocation time for each node +type NodeDuration struct { + Name string // node name + PodCIDR string // the podCIDR that was assigned to the node + Duration JSONDuration // how long it took to assign podCIDR +} + +// Results represents the observed test results. +type Results struct { + Name string // name for the test + Config *Config // handle to the test config + Succeeded bool // whether all nodes were assigned podCIDR + MaxAllocTime JSONDuration // the maximum time take for assignment per node + TotalAllocTime JSONDuration // duration between first addition and last assignment + NodeAllocTime []NodeDuration // assignment time by node name +} + +// NewObserver creates a new observer given a handle to the Clientset +func NewObserver(clientSet *clientset.Clientset, numNodes int) *Observer { + o := &Observer{ + timing: map[string]*nodeTime{}, + numNodes: numNodes, + clientSet: clientSet, + stopChan: make(chan struct{}), + } + return o +} + +// StartObserving starts an asynchronous loop to monitor for node changes. +// Call Results() to get the test results after starting observer. +func (o *Observer) StartObserving() error { + go o.monitor() + glog.Infof("Test observer started") + return nil +} + +// Results returns the test results. It waits for the observer to finish +// and returns the computed results of the observations. +func (o *Observer) Results(name string, config *Config) *Results { + var ( + firstAdd time.Time // earliest time any node was added (first node add) + lastAssignment time.Time // latest time any node was assignged CIDR (last node assignment) + ) + o.wg.Wait() + close(o.stopChan) // shutdown the shared informer + + results := &Results{ + Name: name, + Config: config, + Succeeded: o.numAdded == o.numNodes && o.numAllocated == o.numNodes, + MaxAllocTime: 0, + NodeAllocTime: []NodeDuration{}, + } + for name, nTime := range o.timing { + addFound := !nTime.added.IsZero() + if addFound && (firstAdd.IsZero() || nTime.added.Before(firstAdd)) { + firstAdd = nTime.added + } + cidrFound := !nTime.allocated.IsZero() + if cidrFound && nTime.allocated.After(lastAssignment) { + lastAssignment = nTime.allocated + } + if addFound && cidrFound { + allocTime := nTime.allocated.Sub(nTime.added) + if allocTime > time.Duration(results.MaxAllocTime) { + results.MaxAllocTime = JSONDuration(allocTime) + } + results.NodeAllocTime = append(results.NodeAllocTime, NodeDuration{ + Name: name, PodCIDR: nTime.podCIDR, Duration: JSONDuration(allocTime), + }) + } + } + results.TotalAllocTime = JSONDuration(lastAssignment.Sub(firstAdd)) + sort.Slice(results.NodeAllocTime, func(i, j int) bool { + return results.NodeAllocTime[i].Duration > results.NodeAllocTime[j].Duration + }) + return results +} + +func (o *Observer) monitor() { + o.wg.Add(1) + + sharedInformer := informers.NewSharedInformerFactory(o.clientSet, 1*time.Second) + nodeInformer := sharedInformer.Core().V1().Nodes().Informer() + + nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) (err error) { + name := node.GetName() + if node.Spec.PodCIDR != "" { + // ignore nodes that have PodCIDR (might be hold over from previous runs that did not get cleaned up) + return + } + nTime := &nodeTime{} + o.timing[name] = nTime + nTime.added = time.Now() + o.numAdded = o.numAdded + 1 + return + }), + UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(oldNode, newNode *v1.Node) (err error) { + name := newNode.GetName() + nTime, found := o.timing[name] + if !found { + return // consistency check - ignore nodes we have not seen the add event for + } + // check if CIDR assigned and ignore redundant updates + if newNode.Spec.PodCIDR != "" && nTime.podCIDR == "" { + nTime.allocated = time.Now() + nTime.podCIDR = newNode.Spec.PodCIDR + o.numAllocated++ + // do following check only if numAllocated is modified, as otherwise, redundant updates + // can cause wg.Done() to be called multiple times, causing a panic + if o.numAdded == o.numNodes && o.numAllocated == o.numNodes { + glog.Info("All nodes assigned podCIDR") + o.wg.Done() + } + } + return + }), + }) + sharedInformer.Start(o.stopChan) +} + +// String implements the Stringer interface and returns a multi-line representation +// of the test results. +func (results *Results) String() string { + var b bytes.Buffer + fmt.Fprintf(&b, "\n TestName: %s", results.Name) + fmt.Fprintf(&b, "\n NumNodes: %d, CreateQPS: %d, KubeQPS: %d, CloudQPS: %d, Allocator: %v", + results.Config.NumNodes, results.Config.CreateQPS, results.Config.KubeQPS, + results.Config.CloudQPS, results.Config.AllocatorType) + fmt.Fprintf(&b, "\n Succeeded: %v, TotalAllocTime: %v, MaxAllocTime: %v", + results.Succeeded, time.Duration(results.TotalAllocTime), time.Duration(results.MaxAllocTime)) + fmt.Fprintf(&b, "\n %5s %-20s %-20s %s", "Num", "Node", "PodCIDR", "Duration (s)") + for i, d := range results.NodeAllocTime { + fmt.Fprintf(&b, "\n %5d %-20s %-20s %10.3f", i+1, d.Name, d.PodCIDR, time.Duration(d.Duration).Seconds()) + } + return b.String() +} + +// MarshalJSON implements the json.Marshaler interface +func (jDuration *JSONDuration) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf("\"%s\"", time.Duration(*jDuration).String())), nil +} + +// UnmarshalJSON implements the json.Unmarshaler interface +func (jDuration *JSONDuration) UnmarshalJSON(b []byte) (err error) { + var d time.Duration + if d, err = time.ParseDuration(string(b[1 : len(b)-1])); err == nil { + *jDuration = JSONDuration(d) + } + return +} diff --git a/test/integration/ipamperf/test-performance.sh b/test/integration/ipamperf/test-performance.sh new file mode 100755 index 00000000000..766b19b2107 --- /dev/null +++ b/test/integration/ipamperf/test-performance.sh @@ -0,0 +1,69 @@ +#!/usr/bin/env bash + +# Copyright 2018 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -o errexit +set -o nounset +set -o pipefail + +TEST_ARGS="" +RUN_PATTERN=".*" + +function usage() { + echo "usage: $0 [-h] [-d] [-r ] [-o ]" + echo " -h display this help message" + echo " -d enable debug logs in tests" + echo " -r regex pattern to match for tests" + echo " -o file to write JSON formatted results to" + exit 1 +} + +while getopts ":hdr:o:" opt; do + case ${opt} in + d) TEST_ARGS="${TEST_ARGS} -v=6" + ;; + r) RUN_PATTERN="${OPTARG}" + ;; + o) TEST_ARGS="${TEST_ARGS} -log ${OPTARG}" + ;; + h) ::usage + ;; + \?) ::usage + ;; + esac +done + +KUBE_ROOT=$(dirname "${BASH_SOURCE}")/../../../ +source "${KUBE_ROOT}/hack/lib/init.sh" + +kube::golang::setup_env + +DIR_BASENAME=$(dirname "${BASH_SOURCE}") +pushd ${DIR_BASENAME} + +cleanup() { + popd 2> /dev/null + kube::etcd::cleanup + kube::log::status "performance test cleanup complete" +} + +trap cleanup EXIT + +kube::etcd::start + +# Running IPAM tests. It might take a long time. +kube::log::status "performance test (IPAM) start" +go test -test.run=${RUN_PATTERN} -test.timeout=60m -test.short=false -v -args ${TEST_ARGS} +kube::log::status "... IPAM tests finished." diff --git a/test/integration/ipamperf/util.go b/test/integration/ipamperf/util.go new file mode 100644 index 00000000000..d45f21515ee --- /dev/null +++ b/test/integration/ipamperf/util.go @@ -0,0 +1,80 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipamperf + +import ( + "github.com/golang/glog" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clientset "k8s.io/client-go/kubernetes" + restclient "k8s.io/client-go/rest" + "k8s.io/kubernetes/pkg/api/testapi" +) + +var ( + baseNodeTemplate = &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "sample-node-", + }, + Spec: v1.NodeSpec{ + ExternalID: "foo", + }, + Status: v1.NodeStatus{ + Capacity: v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI), + v1.ResourceCPU: resource.MustParse("4"), + v1.ResourceMemory: resource.MustParse("32Gi"), + }, + Phase: v1.NodeRunning, + Conditions: []v1.NodeCondition{ + {Type: v1.NodeReady, Status: v1.ConditionTrue}, + }, + }, + } +) + +func deleteNodes(apiURL string, config *Config) { + glog.Info("Deleting nodes") + clientSet := clientset.NewForConfigOrDie(&restclient.Config{ + Host: apiURL, + ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Groups[v1.GroupName].GroupVersion()}, + QPS: float32(config.CreateQPS), + Burst: config.CreateQPS, + }) + noGrace := int64(0) + if err := clientSet.CoreV1().Nodes().DeleteCollection(&metav1.DeleteOptions{GracePeriodSeconds: &noGrace}, metav1.ListOptions{}); err != nil { + glog.Errorf("Error deleting node: %v", err) + } +} + +func createNodes(apiURL string, config *Config) error { + clientSet := clientset.NewForConfigOrDie(&restclient.Config{ + Host: apiURL, + ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Groups[v1.GroupName].GroupVersion()}, + QPS: float32(config.CreateQPS), + Burst: config.CreateQPS, + }) + glog.Infof("Creating %d nodes", config.NumNodes) + for i := 0; i < config.NumNodes; i++ { + if _, err := clientSet.CoreV1().Nodes().Create(baseNodeTemplate); err != nil { + return err + } + } + glog.Infof("%d nodes created", config.NumNodes) + return nil +} diff --git a/test/integration/util/BUILD b/test/integration/util/BUILD index fe7d7bc6c0f..bfa500665c8 100644 --- a/test/integration/util/BUILD +++ b/test/integration/util/BUILD @@ -8,15 +8,19 @@ load( go_library( name = "go_default_library", srcs = [ + "cloud.go", "util.go", ], importpath = "k8s.io/kubernetes/test/integration/util", deps = [ "//pkg/api/legacyscheme:go_default_library", + "//pkg/cloudprovider/providers/gce:go_default_library", + "//pkg/cloudprovider/providers/gce/cloud:go_default_library", "//pkg/scheduler:go_default_library", "//pkg/scheduler/factory:go_default_library", "//test/integration/framework:go_default_library", "//vendor/github.com/golang/glog:go_default_library", + "//vendor/golang.org/x/oauth2:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", diff --git a/test/integration/util/cloud.go b/test/integration/util/cloud.go new file mode 100644 index 00000000000..d5c758d4aa6 --- /dev/null +++ b/test/integration/util/cloud.go @@ -0,0 +1,72 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "time" + + "golang.org/x/oauth2" + "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" + "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud" +) + +const ( + // TestProjectID is the project id used for creating NewMockGCECloud + TestProjectID = "test-project" + // TestNetworkProjectID is the network project id for creating NewMockGCECloud + TestNetworkProjectID = "net-test-project" + // TestRegion is the region for creating NewMockGCECloud + TestRegion = "test-region" + // TestZone is the zone for creating NewMockGCECloud + TestZone = "test-zone" + // TestNetworkName is the network name for creating NewMockGCECloud + TestNetworkName = "test-network" + // TestSubnetworkName is the sub network name for creating NewMockGCECloud + TestSubnetworkName = "test-sub-network" + // TestSecondaryRangeName is the secondary range name for creating NewMockGCECloud + TestSecondaryRangeName = "test-secondary-range" +) + +type mockTokenSource struct{} + +func (*mockTokenSource) Token() (*oauth2.Token, error) { + return &oauth2.Token{ + AccessToken: "access", + TokenType: "Bearer", + RefreshToken: "refresh", + Expiry: time.Now().Add(1 * time.Hour), + }, nil +} + +// NewMockGCECloud returns a handle to a GCECloud instance that is +// served by a mock http server +func NewMockGCECloud(cloud cloud.Cloud) (*gce.GCECloud, error) { + config := &gce.CloudConfig{ + ProjectID: TestProjectID, + NetworkProjectID: TestNetworkProjectID, + Region: TestRegion, + Zone: TestZone, + ManagedZones: []string{TestZone}, + NetworkName: TestNetworkName, + SubnetworkName: TestSubnetworkName, + SecondaryRangeName: TestSecondaryRangeName, + NodeTags: []string{}, + UseMetadataServer: false, + TokenSource: &mockTokenSource{}, + } + return gce.CreateGCECloudWithCloud(config, cloud) +}