Add simple batching to endpoints controller

This commit is contained in:
Maciej Borsz
2019-07-24 11:01:42 +02:00
parent 3ea28073a4
commit 2fae3cbcfe
8 changed files with 479 additions and 61 deletions

View File

@@ -9,6 +9,7 @@ go_library(
],
importpath = "k8s.io/kubernetes/pkg/controller/endpoint/config",
visibility = ["//visibility:public"],
deps = ["//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library"],
)
filegroup(

View File

@@ -16,10 +16,21 @@ limitations under the License.
package config
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// EndpointControllerConfiguration contains elements describing EndpointController.
type EndpointControllerConfiguration struct {
// concurrentEndpointSyncs is the number of endpoint syncing operations
// that will be done concurrently. Larger number = faster endpoint updating,
// but more CPU (and network) load.
ConcurrentEndpointSyncs int32
// EndpointUpdatesBatchPeriod can be used to batch endpoint updates.
// All updates of endpoint triggered by pod change will be delayed by up to
// 'EndpointUpdatesBatchPeriod'. If other pods in the same endpoint change
// in that period, they will be batched to a single endpoint update.
// Default 0 value means that each pod update triggers an endpoint update.
EndpointUpdatesBatchPeriod metav1.Duration
}

View File

@@ -22,7 +22,7 @@ import (
"strconv"
"time"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -72,7 +72,7 @@ const (
// NewEndpointController returns a new *EndpointController.
func NewEndpointController(podInformer coreinformers.PodInformer, serviceInformer coreinformers.ServiceInformer,
endpointsInformer coreinformers.EndpointsInformer, client clientset.Interface) *EndpointController {
endpointsInformer coreinformers.EndpointsInformer, client clientset.Interface, endpointUpdatesBatchPeriod time.Duration) *EndpointController {
broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(klog.Infof)
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
@@ -112,6 +112,8 @@ func NewEndpointController(podInformer coreinformers.PodInformer, serviceInforme
e.eventBroadcaster = broadcaster
e.eventRecorder = recorder
e.endpointUpdatesBatchPeriod = endpointUpdatesBatchPeriod
return e
}
@@ -155,6 +157,8 @@ type EndpointController struct {
// triggerTimeTracker is an util used to compute and export the EndpointsLastChangeTriggerTime
// annotation.
triggerTimeTracker *TriggerTimeTracker
endpointUpdatesBatchPeriod time.Duration
}
// Run will not return until stopCh is closed. workers determines how many
@@ -210,7 +214,7 @@ func (e *EndpointController) addPod(obj interface{}) {
return
}
for key := range services {
e.queue.Add(key)
e.queue.AddAfter(key, e.endpointUpdatesBatchPeriod)
}
}
@@ -311,7 +315,7 @@ func (e *EndpointController) updatePod(old, cur interface{}) {
}
for key := range services {
e.queue.Add(key)
e.queue.AddAfter(key, e.endpointUpdatesBatchPeriod)
}
}

View File

@@ -20,10 +20,11 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"strconv"
"testing"
"time"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -48,36 +49,43 @@ var triggerTime = time.Date(2018, 01, 01, 0, 0, 0, 0, time.UTC)
var triggerTimeString = triggerTime.Format(time.RFC3339Nano)
var oldTriggerTimeString = triggerTime.Add(-time.Hour).Format(time.RFC3339Nano)
func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotReady int) {
for i := 0; i < nPods+nNotReady; i++ {
p := &v1.Pod{
TypeMeta: metav1.TypeMeta{APIVersion: "v1"},
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: fmt.Sprintf("pod%d", i),
Labels: map[string]string{"foo": "bar"},
},
Spec: v1.PodSpec{
Containers: []v1.Container{{Ports: []v1.ContainerPort{}}},
},
Status: v1.PodStatus{
PodIP: fmt.Sprintf("1.2.3.%d", 4+i),
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
func testPod(namespace string, id int, nPorts int, isReady bool) *v1.Pod {
p := &v1.Pod{
TypeMeta: metav1.TypeMeta{APIVersion: "v1"},
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: fmt.Sprintf("pod%d", id),
Labels: map[string]string{"foo": "bar"},
},
Spec: v1.PodSpec{
Containers: []v1.Container{{Ports: []v1.ContainerPort{}}},
},
Status: v1.PodStatus{
PodIP: fmt.Sprintf("1.2.3.%d", 4+id),
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
},
}
if i >= nPods {
p.Status.Conditions[0].Status = v1.ConditionFalse
}
for j := 0; j < nPorts; j++ {
p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports,
v1.ContainerPort{Name: fmt.Sprintf("port%d", i), ContainerPort: int32(8080 + j)})
}
store.Add(p)
},
}
if !isReady {
p.Status.Conditions[0].Status = v1.ConditionFalse
}
for j := 0; j < nPorts; j++ {
p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports,
v1.ContainerPort{Name: fmt.Sprintf("port%d", j), ContainerPort: int32(8080 + j)})
}
return p
}
func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotReady int) {
for i := 0; i < nPods+nNotReady; i++ {
isReady := i < nPods
pod := testPod(namespace, i, nPorts, isReady)
store.Add(pod)
}
}
@@ -107,7 +115,7 @@ func addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(store cache.Store, namesp
}
for j := 0; j < nPorts; j++ {
p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports,
v1.ContainerPort{Name: fmt.Sprintf("port%d", i), ContainerPort: int32(8080 + j)})
v1.ContainerPort{Name: fmt.Sprintf("port%d", j), ContainerPort: int32(8080 + j)})
}
store.Add(p)
}
@@ -135,11 +143,11 @@ type endpointController struct {
endpointsStore cache.Store
}
func newController(url string) *endpointController {
func newController(url string, batchPeriod time.Duration) *endpointController {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: url, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
endpoints := NewEndpointController(informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Services(),
informerFactory.Core().V1().Endpoints(), client)
informerFactory.Core().V1().Endpoints(), client, batchPeriod)
endpoints.podsSynced = alwaysReady
endpoints.servicesSynced = alwaysReady
endpoints.endpointsSynced = alwaysReady
@@ -155,7 +163,7 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
ns := metav1.NamespaceDefault
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints := newController(testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
@@ -179,7 +187,7 @@ func TestSyncEndpointsExistingNilSubsets(t *testing.T) {
ns := metav1.NamespaceDefault
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints := newController(testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
@@ -203,7 +211,7 @@ func TestSyncEndpointsExistingEmptySubsets(t *testing.T) {
ns := metav1.NamespaceDefault
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints := newController(testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
@@ -227,7 +235,7 @@ func TestSyncEndpointsNewNoSubsets(t *testing.T) {
ns := metav1.NamespaceDefault
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints := newController(testServer.URL, 0*time.Second)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
@@ -243,7 +251,7 @@ func TestCheckLeftoverEndpoints(t *testing.T) {
ns := metav1.NamespaceDefault
testServer, _ := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints := newController(testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
@@ -269,7 +277,7 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints := newController(testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
@@ -310,7 +318,7 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints := newController(testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
@@ -351,7 +359,7 @@ func TestSyncEndpointsProtocolSCTP(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints := newController(testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
@@ -392,7 +400,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints := newController(testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
@@ -429,7 +437,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints := newController(testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
@@ -466,7 +474,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints := newController(testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
@@ -504,7 +512,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) {
ns := "bar"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints := newController(testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
@@ -544,7 +552,7 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) {
ns := metav1.NamespaceDefault
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints := newController(testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "1",
@@ -572,7 +580,7 @@ func TestSyncEndpointsItems(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints := newController(testServer.URL, 0*time.Second)
addPods(endpoints.podStore, ns, 3, 2, 0)
addPods(endpoints.podStore, "blah", 5, 2, 0) // make sure these aren't found!
endpoints.serviceStore.Add(&v1.Service{
@@ -613,7 +621,7 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints := newController(testServer.URL, 0*time.Second)
addPods(endpoints.podStore, ns, 3, 2, 0)
serviceLabels := map[string]string{"foo": "bar"}
endpoints.serviceStore.Add(&v1.Service{
@@ -659,7 +667,7 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) {
ns := "bar"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints := newController(testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
@@ -722,7 +730,7 @@ func TestWaitsForAllInformersToBeSynced2(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints := newController(testServer.URL, 0*time.Second)
addPods(endpoints.podStore, ns, 1, 1, 0)
service := &v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
@@ -762,7 +770,7 @@ func TestSyncEndpointsHeadlessService(t *testing.T) {
ns := "headless"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints := newController(testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
@@ -803,7 +811,7 @@ func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseFail
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints := newController(testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
@@ -839,7 +847,7 @@ func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseSucc
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints := newController(testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
@@ -875,7 +883,7 @@ func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyOnFailureAndPhase
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints := newController(testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
@@ -911,7 +919,7 @@ func TestSyncEndpointsHeadlessWithoutPort(t *testing.T) {
ns := metav1.NamespaceDefault
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints := newController(testServer.URL, 0*time.Second)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
@@ -1183,7 +1191,7 @@ func TestLastTriggerChangeTimeAnnotation(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints := newController(testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
@@ -1227,7 +1235,7 @@ func TestLastTriggerChangeTimeAnnotation_AnnotationOverridden(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints := newController(testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
@@ -1274,7 +1282,7 @@ func TestLastTriggerChangeTimeAnnotation_AnnotationCleared(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints := newController(testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
@@ -1315,3 +1323,388 @@ func TestLastTriggerChangeTimeAnnotation_AnnotationCleared(t *testing.T) {
})
endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
}
// TestPodUpdatesBatching verifies that endpoint updates caused by pod updates are batched together.
// This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now.
// TODO(mborsz): Migrate this test to mock clock when possible.
func TestPodUpdatesBatching(t *testing.T) {
type podUpdate struct {
delay time.Duration
podName string
podIP string
}
tests := []struct {
name string
batchPeriod time.Duration
podsCount int
updates []podUpdate
finalDelay time.Duration
wantRequestCount int
}{
{
name: "three updates with no batching",
batchPeriod: 0 * time.Second,
podsCount: 10,
updates: []podUpdate{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
podName: "pod0",
podIP: "10.0.0.0",
},
{
delay: 100 * time.Millisecond,
podName: "pod1",
podIP: "10.0.0.1",
},
{
delay: 100 * time.Millisecond,
podName: "pod2",
podIP: "10.0.0.2",
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 3,
},
{
name: "three updates in one batch",
batchPeriod: 1 * time.Second,
podsCount: 10,
updates: []podUpdate{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
podName: "pod0",
podIP: "10.0.0.0",
},
{
delay: 100 * time.Millisecond,
podName: "pod1",
podIP: "10.0.0.1",
},
{
delay: 100 * time.Millisecond,
podName: "pod2",
podIP: "10.0.0.2",
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 1,
},
{
name: "three updates in two batches",
batchPeriod: 1 * time.Second,
podsCount: 10,
updates: []podUpdate{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
podName: "pod0",
podIP: "10.0.0.0",
},
{
delay: 100 * time.Millisecond,
podName: "pod1",
podIP: "10.0.0.1",
},
{
delay: 1 * time.Second,
podName: "pod2",
podIP: "10.0.0.2",
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 2,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ns := "other"
resourceVersion := 1
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL, tc.batchPeriod)
stopCh := make(chan struct{})
defer close(stopCh)
endpoints.podsSynced = alwaysReady
endpoints.servicesSynced = alwaysReady
endpoints.endpointsSynced = alwaysReady
endpoints.workerLoopPeriod = 10 * time.Millisecond
go endpoints.Run(1, stopCh)
addPods(endpoints.podStore, ns, tc.podsCount, 1, 0)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Port: 80}},
},
})
for _, update := range tc.updates {
time.Sleep(update.delay)
old, exists, err := endpoints.podStore.GetByKey(fmt.Sprintf("%s/%s", ns, update.podName))
if err != nil {
t.Fatalf("Error while retrieving old value of %q: %v", update.podName, err)
}
if !exists {
t.Fatalf("Pod %q doesn't exist", update.podName)
}
oldPod := old.(*v1.Pod)
newPod := oldPod.DeepCopy()
newPod.Status.PodIP = update.podIP
newPod.ResourceVersion = strconv.Itoa(resourceVersion)
resourceVersion++
endpoints.podStore.Update(newPod)
endpoints.updatePod(oldPod, newPod)
}
time.Sleep(tc.finalDelay)
endpointsHandler.ValidateRequestCount(t, tc.wantRequestCount)
})
}
}
// TestPodAddsBatching verifies that endpoint updates caused by pod addition are batched together.
// This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now.
// TODO(mborsz): Migrate this test to mock clock when possible.
func TestPodAddsBatching(t *testing.T) {
type podAdd struct {
delay time.Duration
}
tests := []struct {
name string
batchPeriod time.Duration
adds []podAdd
finalDelay time.Duration
wantRequestCount int
}{
{
name: "three adds with no batching",
batchPeriod: 0 * time.Second,
adds: []podAdd{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
},
{
delay: 100 * time.Millisecond,
},
{
delay: 100 * time.Millisecond,
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 3,
},
{
name: "three adds in one batch",
batchPeriod: 1 * time.Second,
adds: []podAdd{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
},
{
delay: 100 * time.Millisecond,
},
{
delay: 100 * time.Millisecond,
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 1,
},
{
name: "three adds in two batches",
batchPeriod: 1 * time.Second,
adds: []podAdd{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
},
{
delay: 100 * time.Millisecond,
},
{
delay: 1 * time.Second,
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 2,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL, tc.batchPeriod)
stopCh := make(chan struct{})
defer close(stopCh)
endpoints.podsSynced = alwaysReady
endpoints.servicesSynced = alwaysReady
endpoints.endpointsSynced = alwaysReady
endpoints.workerLoopPeriod = 10 * time.Millisecond
go endpoints.Run(1, stopCh)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Port: 80}},
},
})
for i, add := range tc.adds {
time.Sleep(add.delay)
p := testPod(ns, i, 1, true)
endpoints.podStore.Add(p)
endpoints.addPod(p)
}
time.Sleep(tc.finalDelay)
endpointsHandler.ValidateRequestCount(t, tc.wantRequestCount)
})
}
}
// TestPodDeleteBatching verifies that endpoint updates caused by pod deletion are batched together.
// This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now.
// TODO(mborsz): Migrate this test to mock clock when possible.
func TestPodDeleteBatching(t *testing.T) {
type podDelete struct {
delay time.Duration
podName string
}
tests := []struct {
name string
batchPeriod time.Duration
podsCount int
deletes []podDelete
finalDelay time.Duration
wantRequestCount int
}{
{
name: "three deletes with no batching",
batchPeriod: 0 * time.Second,
podsCount: 10,
deletes: []podDelete{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
podName: "pod0",
},
{
delay: 100 * time.Millisecond,
podName: "pod1",
},
{
delay: 100 * time.Millisecond,
podName: "pod2",
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 3,
},
{
name: "three deletes in one batch",
batchPeriod: 1 * time.Second,
podsCount: 10,
deletes: []podDelete{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
podName: "pod0",
},
{
delay: 100 * time.Millisecond,
podName: "pod1",
},
{
delay: 100 * time.Millisecond,
podName: "pod2",
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 1,
},
{
name: "three deletes in two batches",
batchPeriod: 1 * time.Second,
podsCount: 10,
deletes: []podDelete{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
podName: "pod0",
},
{
delay: 100 * time.Millisecond,
podName: "pod1",
},
{
delay: 1 * time.Second,
podName: "pod2",
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 2,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL, tc.batchPeriod)
stopCh := make(chan struct{})
defer close(stopCh)
endpoints.podsSynced = alwaysReady
endpoints.servicesSynced = alwaysReady
endpoints.endpointsSynced = alwaysReady
endpoints.workerLoopPeriod = 10 * time.Millisecond
go endpoints.Run(1, stopCh)
addPods(endpoints.podStore, ns, tc.podsCount, 1, 0)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Port: 80}},
},
})
for _, update := range tc.deletes {
time.Sleep(update.delay)
old, exists, err := endpoints.podStore.GetByKey(fmt.Sprintf("%s/%s", ns, update.podName))
if err != nil {
t.Fatalf("Error while retrieving old value of %q: %v", update.podName, err)
}
if !exists {
t.Fatalf("Pod %q doesn't exist", update.podName)
}
endpoints.podStore.Delete(old)
endpoints.deletePod(old)
}
time.Sleep(tc.finalDelay)
endpointsHandler.ValidateRequestCount(t, tc.wantRequestCount)
})
}
}