From 2fae3cbcfe82156dada603158f657b25fb3494eb Mon Sep 17 00:00:00 2001 From: Maciej Borsz Date: Wed, 24 Jul 2019 11:01:42 +0200 Subject: [PATCH 1/2] Add simple batching to endpoints controller --- api/api-rules/violation_exceptions.list | 1 + cmd/kube-controller-manager/app/core.go | 1 + .../app/options/endpointcontroller.go | 2 + pkg/controller/endpoint/config/BUILD | 1 + pkg/controller/endpoint/config/types.go | 11 + .../endpoint/endpoints_controller.go | 12 +- .../endpoint/endpoints_controller_test.go | 507 ++++++++++++++++-- .../config/v1alpha1/types.go | 5 + 8 files changed, 479 insertions(+), 61 deletions(-) diff --git a/api/api-rules/violation_exceptions.list b/api/api-rules/violation_exceptions.list index f55882430b1..0d6ccead032 100644 --- a/api/api-rules/violation_exceptions.list +++ b/api/api-rules/violation_exceptions.list @@ -543,6 +543,7 @@ API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,D API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,DeprecatedControllerConfiguration,DeletingPodsQPS API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,DeprecatedControllerConfiguration,RegisterRetryCount API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,EndpointControllerConfiguration,ConcurrentEndpointSyncs +API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,EndpointControllerConfiguration,EndpointUpdatesBatchPeriod API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,GarbageCollectorControllerConfiguration,ConcurrentGCSyncs API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,GarbageCollectorControllerConfiguration,EnableGarbageCollector API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,GarbageCollectorControllerConfiguration,GCIgnoredResources diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 64528678e8d..d80c6e5a111 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -300,6 +300,7 @@ func startEndpointController(ctx ControllerContext) (http.Handler, bool, error) ctx.InformerFactory.Core().V1().Services(), ctx.InformerFactory.Core().V1().Endpoints(), ctx.ClientBuilder.ClientOrDie("endpoint-controller"), + ctx.ComponentConfig.EndpointController.EndpointUpdatesBatchPeriod.Duration, ).Run(int(ctx.ComponentConfig.EndpointController.ConcurrentEndpointSyncs), ctx.Stop) return nil, true, nil } diff --git a/cmd/kube-controller-manager/app/options/endpointcontroller.go b/cmd/kube-controller-manager/app/options/endpointcontroller.go index f60b4e52447..ae052154fef 100644 --- a/cmd/kube-controller-manager/app/options/endpointcontroller.go +++ b/cmd/kube-controller-manager/app/options/endpointcontroller.go @@ -34,6 +34,7 @@ func (o *EndpointControllerOptions) AddFlags(fs *pflag.FlagSet) { } fs.Int32Var(&o.ConcurrentEndpointSyncs, "concurrent-endpoint-syncs", o.ConcurrentEndpointSyncs, "The number of endpoint syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load") + fs.DurationVar(&o.EndpointUpdatesBatchPeriod.Duration, "endpoint-updates-batch-period", o.EndpointUpdatesBatchPeriod.Duration, "The length of endpoint updates batching period. Processing of pod changes will be delayed by this duration to join them with potential upcoming updates and reduce the overall number of endpoints updates. Larger number = higher endpoint programming latency, but lower number of endpoints revision generated") } // ApplyTo fills up EndPointController config with options. @@ -43,6 +44,7 @@ func (o *EndpointControllerOptions) ApplyTo(cfg *endpointconfig.EndpointControll } cfg.ConcurrentEndpointSyncs = o.ConcurrentEndpointSyncs + cfg.EndpointUpdatesBatchPeriod = o.EndpointUpdatesBatchPeriod return nil } diff --git a/pkg/controller/endpoint/config/BUILD b/pkg/controller/endpoint/config/BUILD index 2984a8ff20a..e4f40c09b5f 100644 --- a/pkg/controller/endpoint/config/BUILD +++ b/pkg/controller/endpoint/config/BUILD @@ -9,6 +9,7 @@ go_library( ], importpath = "k8s.io/kubernetes/pkg/controller/endpoint/config", visibility = ["//visibility:public"], + deps = ["//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library"], ) filegroup( diff --git a/pkg/controller/endpoint/config/types.go b/pkg/controller/endpoint/config/types.go index e96389d5757..00387397779 100644 --- a/pkg/controller/endpoint/config/types.go +++ b/pkg/controller/endpoint/config/types.go @@ -16,10 +16,21 @@ limitations under the License. package config +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + // EndpointControllerConfiguration contains elements describing EndpointController. type EndpointControllerConfiguration struct { // concurrentEndpointSyncs is the number of endpoint syncing operations // that will be done concurrently. Larger number = faster endpoint updating, // but more CPU (and network) load. ConcurrentEndpointSyncs int32 + + // EndpointUpdatesBatchPeriod can be used to batch endpoint updates. + // All updates of endpoint triggered by pod change will be delayed by up to + // 'EndpointUpdatesBatchPeriod'. If other pods in the same endpoint change + // in that period, they will be batched to a single endpoint update. + // Default 0 value means that each pod update triggers an endpoint update. + EndpointUpdatesBatchPeriod metav1.Duration } diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index 799dba10335..fd2da304e0d 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -22,7 +22,7 @@ import ( "strconv" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -72,7 +72,7 @@ const ( // NewEndpointController returns a new *EndpointController. func NewEndpointController(podInformer coreinformers.PodInformer, serviceInformer coreinformers.ServiceInformer, - endpointsInformer coreinformers.EndpointsInformer, client clientset.Interface) *EndpointController { + endpointsInformer coreinformers.EndpointsInformer, client clientset.Interface, endpointUpdatesBatchPeriod time.Duration) *EndpointController { broadcaster := record.NewBroadcaster() broadcaster.StartLogging(klog.Infof) broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")}) @@ -112,6 +112,8 @@ func NewEndpointController(podInformer coreinformers.PodInformer, serviceInforme e.eventBroadcaster = broadcaster e.eventRecorder = recorder + e.endpointUpdatesBatchPeriod = endpointUpdatesBatchPeriod + return e } @@ -155,6 +157,8 @@ type EndpointController struct { // triggerTimeTracker is an util used to compute and export the EndpointsLastChangeTriggerTime // annotation. triggerTimeTracker *TriggerTimeTracker + + endpointUpdatesBatchPeriod time.Duration } // Run will not return until stopCh is closed. workers determines how many @@ -210,7 +214,7 @@ func (e *EndpointController) addPod(obj interface{}) { return } for key := range services { - e.queue.Add(key) + e.queue.AddAfter(key, e.endpointUpdatesBatchPeriod) } } @@ -311,7 +315,7 @@ func (e *EndpointController) updatePod(old, cur interface{}) { } for key := range services { - e.queue.Add(key) + e.queue.AddAfter(key, e.endpointUpdatesBatchPeriod) } } diff --git a/pkg/controller/endpoint/endpoints_controller_test.go b/pkg/controller/endpoint/endpoints_controller_test.go index b60a2efe969..6b7236acce1 100644 --- a/pkg/controller/endpoint/endpoints_controller_test.go +++ b/pkg/controller/endpoint/endpoints_controller_test.go @@ -20,10 +20,11 @@ import ( "fmt" "net/http" "net/http/httptest" + "strconv" "testing" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -48,36 +49,43 @@ var triggerTime = time.Date(2018, 01, 01, 0, 0, 0, 0, time.UTC) var triggerTimeString = triggerTime.Format(time.RFC3339Nano) var oldTriggerTimeString = triggerTime.Add(-time.Hour).Format(time.RFC3339Nano) -func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotReady int) { - for i := 0; i < nPods+nNotReady; i++ { - p := &v1.Pod{ - TypeMeta: metav1.TypeMeta{APIVersion: "v1"}, - ObjectMeta: metav1.ObjectMeta{ - Namespace: namespace, - Name: fmt.Sprintf("pod%d", i), - Labels: map[string]string{"foo": "bar"}, - }, - Spec: v1.PodSpec{ - Containers: []v1.Container{{Ports: []v1.ContainerPort{}}}, - }, - Status: v1.PodStatus{ - PodIP: fmt.Sprintf("1.2.3.%d", 4+i), - Conditions: []v1.PodCondition{ - { - Type: v1.PodReady, - Status: v1.ConditionTrue, - }, +func testPod(namespace string, id int, nPorts int, isReady bool) *v1.Pod { + p := &v1.Pod{ + TypeMeta: metav1.TypeMeta{APIVersion: "v1"}, + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: fmt.Sprintf("pod%d", id), + Labels: map[string]string{"foo": "bar"}, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{{Ports: []v1.ContainerPort{}}}, + }, + Status: v1.PodStatus{ + PodIP: fmt.Sprintf("1.2.3.%d", 4+id), + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionTrue, }, }, - } - if i >= nPods { - p.Status.Conditions[0].Status = v1.ConditionFalse - } - for j := 0; j < nPorts; j++ { - p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports, - v1.ContainerPort{Name: fmt.Sprintf("port%d", i), ContainerPort: int32(8080 + j)}) - } - store.Add(p) + }, + } + if !isReady { + p.Status.Conditions[0].Status = v1.ConditionFalse + } + for j := 0; j < nPorts; j++ { + p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports, + v1.ContainerPort{Name: fmt.Sprintf("port%d", j), ContainerPort: int32(8080 + j)}) + } + + return p +} + +func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotReady int) { + for i := 0; i < nPods+nNotReady; i++ { + isReady := i < nPods + pod := testPod(namespace, i, nPorts, isReady) + store.Add(pod) } } @@ -107,7 +115,7 @@ func addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(store cache.Store, namesp } for j := 0; j < nPorts; j++ { p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports, - v1.ContainerPort{Name: fmt.Sprintf("port%d", i), ContainerPort: int32(8080 + j)}) + v1.ContainerPort{Name: fmt.Sprintf("port%d", j), ContainerPort: int32(8080 + j)}) } store.Add(p) } @@ -135,11 +143,11 @@ type endpointController struct { endpointsStore cache.Store } -func newController(url string) *endpointController { +func newController(url string, batchPeriod time.Duration) *endpointController { client := clientset.NewForConfigOrDie(&restclient.Config{Host: url, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) endpoints := NewEndpointController(informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Services(), - informerFactory.Core().V1().Endpoints(), client) + informerFactory.Core().V1().Endpoints(), client, batchPeriod) endpoints.podsSynced = alwaysReady endpoints.servicesSynced = alwaysReady endpoints.endpointsSynced = alwaysReady @@ -155,7 +163,7 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { ns := metav1.NamespaceDefault testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() - endpoints := newController(testServer.URL) + endpoints := newController(testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -179,7 +187,7 @@ func TestSyncEndpointsExistingNilSubsets(t *testing.T) { ns := metav1.NamespaceDefault testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() - endpoints := newController(testServer.URL) + endpoints := newController(testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -203,7 +211,7 @@ func TestSyncEndpointsExistingEmptySubsets(t *testing.T) { ns := metav1.NamespaceDefault testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() - endpoints := newController(testServer.URL) + endpoints := newController(testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -227,7 +235,7 @@ func TestSyncEndpointsNewNoSubsets(t *testing.T) { ns := metav1.NamespaceDefault testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() - endpoints := newController(testServer.URL) + endpoints := newController(testServer.URL, 0*time.Second) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ @@ -243,7 +251,7 @@ func TestCheckLeftoverEndpoints(t *testing.T) { ns := metav1.NamespaceDefault testServer, _ := makeTestServer(t, ns) defer testServer.Close() - endpoints := newController(testServer.URL) + endpoints := newController(testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -269,7 +277,7 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) { ns := "other" testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() - endpoints := newController(testServer.URL) + endpoints := newController(testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -310,7 +318,7 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) { ns := "other" testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() - endpoints := newController(testServer.URL) + endpoints := newController(testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -351,7 +359,7 @@ func TestSyncEndpointsProtocolSCTP(t *testing.T) { ns := "other" testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() - endpoints := newController(testServer.URL) + endpoints := newController(testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -392,7 +400,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { ns := "other" testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() - endpoints := newController(testServer.URL) + endpoints := newController(testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -429,7 +437,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) { ns := "other" testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() - endpoints := newController(testServer.URL) + endpoints := newController(testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -466,7 +474,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) { ns := "other" testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() - endpoints := newController(testServer.URL) + endpoints := newController(testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -504,7 +512,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { ns := "bar" testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() - endpoints := newController(testServer.URL) + endpoints := newController(testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -544,7 +552,7 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) { ns := metav1.NamespaceDefault testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() - endpoints := newController(testServer.URL) + endpoints := newController(testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ ResourceVersion: "1", @@ -572,7 +580,7 @@ func TestSyncEndpointsItems(t *testing.T) { ns := "other" testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() - endpoints := newController(testServer.URL) + endpoints := newController(testServer.URL, 0*time.Second) addPods(endpoints.podStore, ns, 3, 2, 0) addPods(endpoints.podStore, "blah", 5, 2, 0) // make sure these aren't found! endpoints.serviceStore.Add(&v1.Service{ @@ -613,7 +621,7 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) { ns := "other" testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() - endpoints := newController(testServer.URL) + endpoints := newController(testServer.URL, 0*time.Second) addPods(endpoints.podStore, ns, 3, 2, 0) serviceLabels := map[string]string{"foo": "bar"} endpoints.serviceStore.Add(&v1.Service{ @@ -659,7 +667,7 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) { ns := "bar" testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() - endpoints := newController(testServer.URL) + endpoints := newController(testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -722,7 +730,7 @@ func TestWaitsForAllInformersToBeSynced2(t *testing.T) { ns := "other" testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() - endpoints := newController(testServer.URL) + endpoints := newController(testServer.URL, 0*time.Second) addPods(endpoints.podStore, ns, 1, 1, 0) service := &v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, @@ -762,7 +770,7 @@ func TestSyncEndpointsHeadlessService(t *testing.T) { ns := "headless" testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() - endpoints := newController(testServer.URL) + endpoints := newController(testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -803,7 +811,7 @@ func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseFail ns := "other" testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() - endpoints := newController(testServer.URL) + endpoints := newController(testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -839,7 +847,7 @@ func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseSucc ns := "other" testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() - endpoints := newController(testServer.URL) + endpoints := newController(testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -875,7 +883,7 @@ func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyOnFailureAndPhase ns := "other" testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() - endpoints := newController(testServer.URL) + endpoints := newController(testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -911,7 +919,7 @@ func TestSyncEndpointsHeadlessWithoutPort(t *testing.T) { ns := metav1.NamespaceDefault testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() - endpoints := newController(testServer.URL) + endpoints := newController(testServer.URL, 0*time.Second) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ @@ -1183,7 +1191,7 @@ func TestLastTriggerChangeTimeAnnotation(t *testing.T) { ns := "other" testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() - endpoints := newController(testServer.URL) + endpoints := newController(testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -1227,7 +1235,7 @@ func TestLastTriggerChangeTimeAnnotation_AnnotationOverridden(t *testing.T) { ns := "other" testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() - endpoints := newController(testServer.URL) + endpoints := newController(testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -1274,7 +1282,7 @@ func TestLastTriggerChangeTimeAnnotation_AnnotationCleared(t *testing.T) { ns := "other" testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() - endpoints := newController(testServer.URL) + endpoints := newController(testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -1315,3 +1323,388 @@ func TestLastTriggerChangeTimeAnnotation_AnnotationCleared(t *testing.T) { }) endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data) } + +// TestPodUpdatesBatching verifies that endpoint updates caused by pod updates are batched together. +// This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now. +// TODO(mborsz): Migrate this test to mock clock when possible. +func TestPodUpdatesBatching(t *testing.T) { + type podUpdate struct { + delay time.Duration + podName string + podIP string + } + + tests := []struct { + name string + batchPeriod time.Duration + podsCount int + updates []podUpdate + finalDelay time.Duration + wantRequestCount int + }{ + { + name: "three updates with no batching", + batchPeriod: 0 * time.Second, + podsCount: 10, + updates: []podUpdate{ + { + // endpoints.Run needs ~100 ms to start processing updates. + delay: 200 * time.Millisecond, + podName: "pod0", + podIP: "10.0.0.0", + }, + { + delay: 100 * time.Millisecond, + podName: "pod1", + podIP: "10.0.0.1", + }, + { + delay: 100 * time.Millisecond, + podName: "pod2", + podIP: "10.0.0.2", + }, + }, + finalDelay: 3 * time.Second, + wantRequestCount: 3, + }, + { + name: "three updates in one batch", + batchPeriod: 1 * time.Second, + podsCount: 10, + updates: []podUpdate{ + { + // endpoints.Run needs ~100 ms to start processing updates. + delay: 200 * time.Millisecond, + podName: "pod0", + podIP: "10.0.0.0", + }, + { + delay: 100 * time.Millisecond, + podName: "pod1", + podIP: "10.0.0.1", + }, + { + delay: 100 * time.Millisecond, + podName: "pod2", + podIP: "10.0.0.2", + }, + }, + finalDelay: 3 * time.Second, + wantRequestCount: 1, + }, + { + name: "three updates in two batches", + batchPeriod: 1 * time.Second, + podsCount: 10, + updates: []podUpdate{ + { + // endpoints.Run needs ~100 ms to start processing updates. + delay: 200 * time.Millisecond, + podName: "pod0", + podIP: "10.0.0.0", + }, + { + delay: 100 * time.Millisecond, + podName: "pod1", + podIP: "10.0.0.1", + }, + { + delay: 1 * time.Second, + podName: "pod2", + podIP: "10.0.0.2", + }, + }, + finalDelay: 3 * time.Second, + wantRequestCount: 2, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ns := "other" + resourceVersion := 1 + testServer, endpointsHandler := makeTestServer(t, ns) + defer testServer.Close() + endpoints := newController(testServer.URL, tc.batchPeriod) + stopCh := make(chan struct{}) + defer close(stopCh) + endpoints.podsSynced = alwaysReady + endpoints.servicesSynced = alwaysReady + endpoints.endpointsSynced = alwaysReady + endpoints.workerLoopPeriod = 10 * time.Millisecond + + go endpoints.Run(1, stopCh) + + addPods(endpoints.podStore, ns, tc.podsCount, 1, 0) + + endpoints.serviceStore.Add(&v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, + Spec: v1.ServiceSpec{ + Selector: map[string]string{"foo": "bar"}, + Ports: []v1.ServicePort{{Port: 80}}, + }, + }) + + for _, update := range tc.updates { + time.Sleep(update.delay) + + old, exists, err := endpoints.podStore.GetByKey(fmt.Sprintf("%s/%s", ns, update.podName)) + if err != nil { + t.Fatalf("Error while retrieving old value of %q: %v", update.podName, err) + } + if !exists { + t.Fatalf("Pod %q doesn't exist", update.podName) + } + oldPod := old.(*v1.Pod) + newPod := oldPod.DeepCopy() + newPod.Status.PodIP = update.podIP + newPod.ResourceVersion = strconv.Itoa(resourceVersion) + resourceVersion++ + + endpoints.podStore.Update(newPod) + endpoints.updatePod(oldPod, newPod) + } + + time.Sleep(tc.finalDelay) + endpointsHandler.ValidateRequestCount(t, tc.wantRequestCount) + }) + } +} + +// TestPodAddsBatching verifies that endpoint updates caused by pod addition are batched together. +// This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now. +// TODO(mborsz): Migrate this test to mock clock when possible. +func TestPodAddsBatching(t *testing.T) { + type podAdd struct { + delay time.Duration + } + + tests := []struct { + name string + batchPeriod time.Duration + adds []podAdd + finalDelay time.Duration + wantRequestCount int + }{ + { + name: "three adds with no batching", + batchPeriod: 0 * time.Second, + adds: []podAdd{ + { + // endpoints.Run needs ~100 ms to start processing updates. + delay: 200 * time.Millisecond, + }, + { + delay: 100 * time.Millisecond, + }, + { + delay: 100 * time.Millisecond, + }, + }, + finalDelay: 3 * time.Second, + wantRequestCount: 3, + }, + { + name: "three adds in one batch", + batchPeriod: 1 * time.Second, + adds: []podAdd{ + { + // endpoints.Run needs ~100 ms to start processing updates. + delay: 200 * time.Millisecond, + }, + { + delay: 100 * time.Millisecond, + }, + { + delay: 100 * time.Millisecond, + }, + }, + finalDelay: 3 * time.Second, + wantRequestCount: 1, + }, + { + name: "three adds in two batches", + batchPeriod: 1 * time.Second, + adds: []podAdd{ + { + // endpoints.Run needs ~100 ms to start processing updates. + delay: 200 * time.Millisecond, + }, + { + delay: 100 * time.Millisecond, + }, + { + delay: 1 * time.Second, + }, + }, + finalDelay: 3 * time.Second, + wantRequestCount: 2, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ns := "other" + testServer, endpointsHandler := makeTestServer(t, ns) + defer testServer.Close() + endpoints := newController(testServer.URL, tc.batchPeriod) + stopCh := make(chan struct{}) + defer close(stopCh) + endpoints.podsSynced = alwaysReady + endpoints.servicesSynced = alwaysReady + endpoints.endpointsSynced = alwaysReady + endpoints.workerLoopPeriod = 10 * time.Millisecond + + go endpoints.Run(1, stopCh) + + endpoints.serviceStore.Add(&v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, + Spec: v1.ServiceSpec{ + Selector: map[string]string{"foo": "bar"}, + Ports: []v1.ServicePort{{Port: 80}}, + }, + }) + + for i, add := range tc.adds { + time.Sleep(add.delay) + + p := testPod(ns, i, 1, true) + endpoints.podStore.Add(p) + endpoints.addPod(p) + } + + time.Sleep(tc.finalDelay) + endpointsHandler.ValidateRequestCount(t, tc.wantRequestCount) + }) + } +} + +// TestPodDeleteBatching verifies that endpoint updates caused by pod deletion are batched together. +// This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now. +// TODO(mborsz): Migrate this test to mock clock when possible. +func TestPodDeleteBatching(t *testing.T) { + type podDelete struct { + delay time.Duration + podName string + } + + tests := []struct { + name string + batchPeriod time.Duration + podsCount int + deletes []podDelete + finalDelay time.Duration + wantRequestCount int + }{ + { + name: "three deletes with no batching", + batchPeriod: 0 * time.Second, + podsCount: 10, + deletes: []podDelete{ + { + // endpoints.Run needs ~100 ms to start processing updates. + delay: 200 * time.Millisecond, + podName: "pod0", + }, + { + delay: 100 * time.Millisecond, + podName: "pod1", + }, + { + delay: 100 * time.Millisecond, + podName: "pod2", + }, + }, + finalDelay: 3 * time.Second, + wantRequestCount: 3, + }, + { + name: "three deletes in one batch", + batchPeriod: 1 * time.Second, + podsCount: 10, + deletes: []podDelete{ + { + // endpoints.Run needs ~100 ms to start processing updates. + delay: 200 * time.Millisecond, + podName: "pod0", + }, + { + delay: 100 * time.Millisecond, + podName: "pod1", + }, + { + delay: 100 * time.Millisecond, + podName: "pod2", + }, + }, + finalDelay: 3 * time.Second, + wantRequestCount: 1, + }, + { + name: "three deletes in two batches", + batchPeriod: 1 * time.Second, + podsCount: 10, + deletes: []podDelete{ + { + // endpoints.Run needs ~100 ms to start processing updates. + delay: 200 * time.Millisecond, + podName: "pod0", + }, + { + delay: 100 * time.Millisecond, + podName: "pod1", + }, + { + delay: 1 * time.Second, + podName: "pod2", + }, + }, + finalDelay: 3 * time.Second, + wantRequestCount: 2, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ns := "other" + testServer, endpointsHandler := makeTestServer(t, ns) + defer testServer.Close() + endpoints := newController(testServer.URL, tc.batchPeriod) + stopCh := make(chan struct{}) + defer close(stopCh) + endpoints.podsSynced = alwaysReady + endpoints.servicesSynced = alwaysReady + endpoints.endpointsSynced = alwaysReady + endpoints.workerLoopPeriod = 10 * time.Millisecond + + go endpoints.Run(1, stopCh) + + addPods(endpoints.podStore, ns, tc.podsCount, 1, 0) + + endpoints.serviceStore.Add(&v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, + Spec: v1.ServiceSpec{ + Selector: map[string]string{"foo": "bar"}, + Ports: []v1.ServicePort{{Port: 80}}, + }, + }) + + for _, update := range tc.deletes { + time.Sleep(update.delay) + + old, exists, err := endpoints.podStore.GetByKey(fmt.Sprintf("%s/%s", ns, update.podName)) + if err != nil { + t.Fatalf("Error while retrieving old value of %q: %v", update.podName, err) + } + if !exists { + t.Fatalf("Pod %q doesn't exist", update.podName) + } + endpoints.podStore.Delete(old) + endpoints.deletePod(old) + } + + time.Sleep(tc.finalDelay) + endpointsHandler.ValidateRequestCount(t, tc.wantRequestCount) + }) + } +} diff --git a/staging/src/k8s.io/kube-controller-manager/config/v1alpha1/types.go b/staging/src/k8s.io/kube-controller-manager/config/v1alpha1/types.go index 38d8bf4d3b3..56acbcdc953 100644 --- a/staging/src/k8s.io/kube-controller-manager/config/v1alpha1/types.go +++ b/staging/src/k8s.io/kube-controller-manager/config/v1alpha1/types.go @@ -279,6 +279,11 @@ type EndpointControllerConfiguration struct { // that will be done concurrently. Larger number = faster endpoint updating, // but more CPU (and network) load. ConcurrentEndpointSyncs int32 + + // EndpointUpdatesBatchPeriod describes the length of endpoint updates batching period. + // Processing of pod changes will be delayed by this duration to join them with potential + // upcoming updates and reduce the overall number of endpoints updates. + EndpointUpdatesBatchPeriod metav1.Duration } // GarbageCollectorControllerConfiguration contains elements describing GarbageCollectorController. From d96f24262d5e75fe6911ff1b4105d434ea54538d Mon Sep 17 00:00:00 2001 From: Maciej Borsz Date: Wed, 24 Jul 2019 11:01:57 +0200 Subject: [PATCH 2/2] Autogenerated files --- .../endpoint/config/v1alpha1/zz_generated.conversion.go | 2 ++ pkg/controller/endpoint/config/zz_generated.deepcopy.go | 1 + .../config/v1alpha1/zz_generated.deepcopy.go | 1 + 3 files changed, 4 insertions(+) diff --git a/pkg/controller/endpoint/config/v1alpha1/zz_generated.conversion.go b/pkg/controller/endpoint/config/v1alpha1/zz_generated.conversion.go index 3303edbf611..1611c1d7fae 100644 --- a/pkg/controller/endpoint/config/v1alpha1/zz_generated.conversion.go +++ b/pkg/controller/endpoint/config/v1alpha1/zz_generated.conversion.go @@ -70,11 +70,13 @@ func RegisterConversions(s *runtime.Scheme) error { func autoConvert_v1alpha1_EndpointControllerConfiguration_To_config_EndpointControllerConfiguration(in *v1alpha1.EndpointControllerConfiguration, out *config.EndpointControllerConfiguration, s conversion.Scope) error { out.ConcurrentEndpointSyncs = in.ConcurrentEndpointSyncs + out.EndpointUpdatesBatchPeriod = in.EndpointUpdatesBatchPeriod return nil } func autoConvert_config_EndpointControllerConfiguration_To_v1alpha1_EndpointControllerConfiguration(in *config.EndpointControllerConfiguration, out *v1alpha1.EndpointControllerConfiguration, s conversion.Scope) error { out.ConcurrentEndpointSyncs = in.ConcurrentEndpointSyncs + out.EndpointUpdatesBatchPeriod = in.EndpointUpdatesBatchPeriod return nil } diff --git a/pkg/controller/endpoint/config/zz_generated.deepcopy.go b/pkg/controller/endpoint/config/zz_generated.deepcopy.go index b647cc4e760..b86208b5939 100644 --- a/pkg/controller/endpoint/config/zz_generated.deepcopy.go +++ b/pkg/controller/endpoint/config/zz_generated.deepcopy.go @@ -23,6 +23,7 @@ package config // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *EndpointControllerConfiguration) DeepCopyInto(out *EndpointControllerConfiguration) { *out = *in + out.EndpointUpdatesBatchPeriod = in.EndpointUpdatesBatchPeriod return } diff --git a/staging/src/k8s.io/kube-controller-manager/config/v1alpha1/zz_generated.deepcopy.go b/staging/src/k8s.io/kube-controller-manager/config/v1alpha1/zz_generated.deepcopy.go index f8d242772ce..29e94e1a904 100644 --- a/staging/src/k8s.io/kube-controller-manager/config/v1alpha1/zz_generated.deepcopy.go +++ b/staging/src/k8s.io/kube-controller-manager/config/v1alpha1/zz_generated.deepcopy.go @@ -126,6 +126,7 @@ func (in *DeprecatedControllerConfiguration) DeepCopy() *DeprecatedControllerCon // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *EndpointControllerConfiguration) DeepCopyInto(out *EndpointControllerConfiguration) { *out = *in + out.EndpointUpdatesBatchPeriod = in.EndpointUpdatesBatchPeriod return }