Merge pull request #88745 from mborsz/slice3

Implement simple endpoint slice batching
This commit is contained in:
Kubernetes Prow Robot 2020-03-03 03:03:38 -08:00 committed by GitHub
commit c86aec0564
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 426 additions and 10 deletions

View File

@ -597,6 +597,7 @@ API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,D
API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,EndpointControllerConfiguration,ConcurrentEndpointSyncs API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,EndpointControllerConfiguration,ConcurrentEndpointSyncs
API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,EndpointControllerConfiguration,EndpointUpdatesBatchPeriod API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,EndpointControllerConfiguration,EndpointUpdatesBatchPeriod
API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,EndpointSliceControllerConfiguration,ConcurrentServiceEndpointSyncs API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,EndpointSliceControllerConfiguration,ConcurrentServiceEndpointSyncs
API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,EndpointSliceControllerConfiguration,EndpointUpdatesBatchPeriod
API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,EndpointSliceControllerConfiguration,MaxEndpointsPerSlice API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,EndpointSliceControllerConfiguration,MaxEndpointsPerSlice
API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,GarbageCollectorControllerConfiguration,ConcurrentGCSyncs API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,GarbageCollectorControllerConfiguration,ConcurrentGCSyncs
API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,GarbageCollectorControllerConfiguration,EnableGarbageCollector API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,GarbageCollectorControllerConfiguration,EnableGarbageCollector

View File

@ -48,6 +48,7 @@ func startEndpointSliceController(ctx ControllerContext) (http.Handler, bool, er
ctx.InformerFactory.Discovery().V1beta1().EndpointSlices(), ctx.InformerFactory.Discovery().V1beta1().EndpointSlices(),
ctx.ComponentConfig.EndpointSliceController.MaxEndpointsPerSlice, ctx.ComponentConfig.EndpointSliceController.MaxEndpointsPerSlice,
ctx.ClientBuilder.ClientOrDie("endpointslice-controller"), ctx.ClientBuilder.ClientOrDie("endpointslice-controller"),
ctx.ComponentConfig.EndpointSliceController.EndpointUpdatesBatchPeriod.Duration,
).Run(int(ctx.ComponentConfig.EndpointSliceController.ConcurrentServiceEndpointSyncs), ctx.Stop) ).Run(int(ctx.ComponentConfig.EndpointSliceController.ConcurrentServiceEndpointSyncs), ctx.Stop)
return nil, true, nil return nil, true, nil
} }

View File

@ -18,6 +18,7 @@ package options
import ( import (
"fmt" "fmt"
"github.com/spf13/pflag" "github.com/spf13/pflag"
endpointsliceconfig "k8s.io/kubernetes/pkg/controller/endpointslice/config" endpointsliceconfig "k8s.io/kubernetes/pkg/controller/endpointslice/config"
@ -43,6 +44,7 @@ func (o *EndpointSliceControllerOptions) AddFlags(fs *pflag.FlagSet) {
fs.Int32Var(&o.ConcurrentServiceEndpointSyncs, "concurrent-service-endpoint-syncs", o.ConcurrentServiceEndpointSyncs, "The number of service endpoint syncing operations that will be done concurrently. Larger number = faster endpoint slice updating, but more CPU (and network) load. Defaults to 5.") fs.Int32Var(&o.ConcurrentServiceEndpointSyncs, "concurrent-service-endpoint-syncs", o.ConcurrentServiceEndpointSyncs, "The number of service endpoint syncing operations that will be done concurrently. Larger number = faster endpoint slice updating, but more CPU (and network) load. Defaults to 5.")
fs.Int32Var(&o.MaxEndpointsPerSlice, "max-endpoints-per-slice", o.MaxEndpointsPerSlice, "The maximum number of endpoints that will be added to an EndpointSlice. More endpoints per slice will result in less endpoint slices, but larger resources. Defaults to 100.") fs.Int32Var(&o.MaxEndpointsPerSlice, "max-endpoints-per-slice", o.MaxEndpointsPerSlice, "The maximum number of endpoints that will be added to an EndpointSlice. More endpoints per slice will result in less endpoint slices, but larger resources. Defaults to 100.")
fs.DurationVar(&o.EndpointUpdatesBatchPeriod.Duration, "endpointslice-updates-batch-period", o.EndpointUpdatesBatchPeriod.Duration, "The length of endpoint slice updates batching period. Processing of pod changes will be delayed by this duration to join them with potential upcoming updates and reduce the overall number of endpoints updates. Larger number = higher endpoint programming latency, but lower number of endpoints revision generated")
} }
// ApplyTo fills up EndpointSliceController config with options. // ApplyTo fills up EndpointSliceController config with options.
@ -53,6 +55,7 @@ func (o *EndpointSliceControllerOptions) ApplyTo(cfg *endpointsliceconfig.Endpoi
cfg.ConcurrentServiceEndpointSyncs = o.ConcurrentServiceEndpointSyncs cfg.ConcurrentServiceEndpointSyncs = o.ConcurrentServiceEndpointSyncs
cfg.MaxEndpointsPerSlice = o.MaxEndpointsPerSlice cfg.MaxEndpointsPerSlice = o.MaxEndpointsPerSlice
cfg.EndpointUpdatesBatchPeriod = o.EndpointUpdatesBatchPeriod
return nil return nil
} }

View File

@ -9,6 +9,7 @@ go_library(
], ],
importpath = "k8s.io/kubernetes/pkg/controller/endpointslice/config", importpath = "k8s.io/kubernetes/pkg/controller/endpointslice/config",
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
deps = ["//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library"],
) )
filegroup( filegroup(

View File

@ -16,6 +16,10 @@ limitations under the License.
package config package config
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// EndpointSliceControllerConfiguration contains elements describing // EndpointSliceControllerConfiguration contains elements describing
// EndpointSliceController. // EndpointSliceController.
type EndpointSliceControllerConfiguration struct { type EndpointSliceControllerConfiguration struct {
@ -28,4 +32,11 @@ type EndpointSliceControllerConfiguration struct {
// added to an EndpointSlice. More endpoints per slice will result in fewer // added to an EndpointSlice. More endpoints per slice will result in fewer
// and larger endpoint slices, but larger resources. // and larger endpoint slices, but larger resources.
MaxEndpointsPerSlice int32 MaxEndpointsPerSlice int32
// EndpointUpdatesBatchPeriod can be used to batch endpoint updates.
// All updates of endpoint triggered by pod change will be delayed by up to
// 'EndpointUpdatesBatchPeriod'. If other pods in the same endpoint change
// in that period, they will be batched to a single endpoint update.
// Default 0 value means that each pod update triggers an endpoint update.
EndpointUpdatesBatchPeriod metav1.Duration
} }

View File

@ -61,12 +61,14 @@ func RegisterConversions(s *runtime.Scheme) error {
func autoConvert_v1alpha1_EndpointSliceControllerConfiguration_To_config_EndpointSliceControllerConfiguration(in *v1alpha1.EndpointSliceControllerConfiguration, out *config.EndpointSliceControllerConfiguration, s conversion.Scope) error { func autoConvert_v1alpha1_EndpointSliceControllerConfiguration_To_config_EndpointSliceControllerConfiguration(in *v1alpha1.EndpointSliceControllerConfiguration, out *config.EndpointSliceControllerConfiguration, s conversion.Scope) error {
out.ConcurrentServiceEndpointSyncs = in.ConcurrentServiceEndpointSyncs out.ConcurrentServiceEndpointSyncs = in.ConcurrentServiceEndpointSyncs
out.MaxEndpointsPerSlice = in.MaxEndpointsPerSlice out.MaxEndpointsPerSlice = in.MaxEndpointsPerSlice
out.EndpointUpdatesBatchPeriod = in.EndpointUpdatesBatchPeriod
return nil return nil
} }
func autoConvert_config_EndpointSliceControllerConfiguration_To_v1alpha1_EndpointSliceControllerConfiguration(in *config.EndpointSliceControllerConfiguration, out *v1alpha1.EndpointSliceControllerConfiguration, s conversion.Scope) error { func autoConvert_config_EndpointSliceControllerConfiguration_To_v1alpha1_EndpointSliceControllerConfiguration(in *config.EndpointSliceControllerConfiguration, out *v1alpha1.EndpointSliceControllerConfiguration, s conversion.Scope) error {
out.ConcurrentServiceEndpointSyncs = in.ConcurrentServiceEndpointSyncs out.ConcurrentServiceEndpointSyncs = in.ConcurrentServiceEndpointSyncs
out.MaxEndpointsPerSlice = in.MaxEndpointsPerSlice out.MaxEndpointsPerSlice = in.MaxEndpointsPerSlice
out.EndpointUpdatesBatchPeriod = in.EndpointUpdatesBatchPeriod
return nil return nil
} }

View File

@ -23,6 +23,7 @@ package config
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *EndpointSliceControllerConfiguration) DeepCopyInto(out *EndpointSliceControllerConfiguration) { func (in *EndpointSliceControllerConfiguration) DeepCopyInto(out *EndpointSliceControllerConfiguration) {
*out = *in *out = *in
out.EndpointUpdatesBatchPeriod = in.EndpointUpdatesBatchPeriod
return return
} }

View File

@ -66,6 +66,7 @@ func NewController(podInformer coreinformers.PodInformer,
endpointSliceInformer discoveryinformers.EndpointSliceInformer, endpointSliceInformer discoveryinformers.EndpointSliceInformer,
maxEndpointsPerSlice int32, maxEndpointsPerSlice int32,
client clientset.Interface, client clientset.Interface,
endpointUpdatesBatchPeriod time.Duration,
) *Controller { ) *Controller {
broadcaster := record.NewBroadcaster() broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(klog.Infof) broadcaster.StartLogging(klog.Infof)
@ -129,6 +130,7 @@ func NewController(podInformer coreinformers.PodInformer,
c.eventBroadcaster = broadcaster c.eventBroadcaster = broadcaster
c.eventRecorder = recorder c.eventRecorder = recorder
c.endpointUpdatesBatchPeriod = endpointUpdatesBatchPeriod
c.serviceSelectorCache = endpointutil.NewServiceSelectorCache() c.serviceSelectorCache = endpointutil.NewServiceSelectorCache()
return c return c
@ -194,6 +196,10 @@ type Controller struct {
// process the queue of service and pod changes // process the queue of service and pod changes
workerLoopPeriod time.Duration workerLoopPeriod time.Duration
// endpointUpdatesBatchPeriod is an artificial delay added to all service syncs triggered by pod changes.
// This can be used to reduce overall number of all endpoint slice updates.
endpointUpdatesBatchPeriod time.Duration
// serviceSelectorCache is a cache of service selectors to avoid high CPU consumption caused by frequent calls // serviceSelectorCache is a cache of service selectors to avoid high CPU consumption caused by frequent calls
// to AsSelectorPreValidated (see #73527) // to AsSelectorPreValidated (see #73527)
serviceSelectorCache *endpointutil.ServiceSelectorCache serviceSelectorCache *endpointutil.ServiceSelectorCache
@ -414,14 +420,14 @@ func (c *Controller) addPod(obj interface{}) {
return return
} }
for key := range services { for key := range services {
c.queue.Add(key) c.queue.AddAfter(key, c.endpointUpdatesBatchPeriod)
} }
} }
func (c *Controller) updatePod(old, cur interface{}) { func (c *Controller) updatePod(old, cur interface{}) {
services := endpointutil.GetServicesToUpdateOnPodChange(c.serviceLister, c.serviceSelectorCache, old, cur, podEndpointChanged) services := endpointutil.GetServicesToUpdateOnPodChange(c.serviceLister, c.serviceSelectorCache, old, cur, podEndpointChanged)
for key := range services { for key := range services {
c.queue.Add(key) c.queue.AddAfter(key, c.endpointUpdatesBatchPeriod)
} }
} }

View File

@ -20,6 +20,7 @@ import (
"context" "context"
"fmt" "fmt"
"reflect" "reflect"
"strconv"
"testing" "testing"
"time" "time"
@ -51,7 +52,7 @@ type endpointSliceController struct {
serviceStore cache.Store serviceStore cache.Store
} }
func newController(nodeNames []string) (*fake.Clientset, *endpointSliceController) { func newController(nodeNames []string, batchPeriod time.Duration) (*fake.Clientset, *endpointSliceController) {
client := newClientset() client := newClientset()
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
nodeInformer := informerFactory.Core().V1().Nodes() nodeInformer := informerFactory.Core().V1().Nodes()
@ -66,7 +67,8 @@ func newController(nodeNames []string) (*fake.Clientset, *endpointSliceControlle
nodeInformer, nodeInformer,
informerFactory.Discovery().V1beta1().EndpointSlices(), informerFactory.Discovery().V1beta1().EndpointSlices(),
int32(100), int32(100),
client) client,
batchPeriod)
esController.nodesSynced = alwaysReady esController.nodesSynced = alwaysReady
esController.podsSynced = alwaysReady esController.podsSynced = alwaysReady
@ -86,7 +88,7 @@ func newController(nodeNames []string) (*fake.Clientset, *endpointSliceControlle
func TestSyncServiceNoSelector(t *testing.T) { func TestSyncServiceNoSelector(t *testing.T) {
ns := metav1.NamespaceDefault ns := metav1.NamespaceDefault
serviceName := "testing-1" serviceName := "testing-1"
client, esController := newController([]string{"node-1"}) client, esController := newController([]string{"node-1"}, time.Duration(0))
esController.serviceStore.Add(&v1.Service{ esController.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: ns}, ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: ns},
Spec: v1.ServiceSpec{ Spec: v1.ServiceSpec{
@ -103,7 +105,7 @@ func TestSyncServiceNoSelector(t *testing.T) {
func TestSyncServiceWithSelector(t *testing.T) { func TestSyncServiceWithSelector(t *testing.T) {
ns := metav1.NamespaceDefault ns := metav1.NamespaceDefault
serviceName := "testing-1" serviceName := "testing-1"
client, esController := newController([]string{"node-1"}) client, esController := newController([]string{"node-1"}, time.Duration(0))
standardSyncService(t, esController, ns, serviceName, "true") standardSyncService(t, esController, ns, serviceName, "true")
expectActions(t, client.Actions(), 1, "create", "endpointslices") expectActions(t, client.Actions(), 1, "create", "endpointslices")
@ -123,7 +125,7 @@ func TestSyncServiceWithSelector(t *testing.T) {
// remove too much. // remove too much.
func TestSyncServiceMissing(t *testing.T) { func TestSyncServiceMissing(t *testing.T) {
namespace := metav1.NamespaceDefault namespace := metav1.NamespaceDefault
client, esController := newController([]string{"node-1"}) client, esController := newController([]string{"node-1"}, time.Duration(0))
// Build up existing service // Build up existing service
existingServiceName := "stillthere" existingServiceName := "stillthere"
@ -159,7 +161,7 @@ func TestSyncServiceMissing(t *testing.T) {
// Ensure SyncService correctly selects Pods. // Ensure SyncService correctly selects Pods.
func TestSyncServicePodSelection(t *testing.T) { func TestSyncServicePodSelection(t *testing.T) {
client, esController := newController([]string{"node-1"}) client, esController := newController([]string{"node-1"}, time.Duration(0))
ns := metav1.NamespaceDefault ns := metav1.NamespaceDefault
pod1 := newPod(1, ns, true, 0) pod1 := newPod(1, ns, true, 0)
@ -186,7 +188,7 @@ func TestSyncServicePodSelection(t *testing.T) {
// Ensure SyncService correctly selects and labels EndpointSlices. // Ensure SyncService correctly selects and labels EndpointSlices.
func TestSyncServiceEndpointSliceLabelSelection(t *testing.T) { func TestSyncServiceEndpointSliceLabelSelection(t *testing.T) {
client, esController := newController([]string{"node-1"}) client, esController := newController([]string{"node-1"}, time.Duration(0))
ns := metav1.NamespaceDefault ns := metav1.NamespaceDefault
serviceName := "testing-1" serviceName := "testing-1"
@ -274,7 +276,7 @@ func TestSyncServiceEndpointSliceLabelSelection(t *testing.T) {
// Ensure SyncService handles a variety of protocols and IPs appropriately. // Ensure SyncService handles a variety of protocols and IPs appropriately.
func TestSyncServiceFull(t *testing.T) { func TestSyncServiceFull(t *testing.T) {
client, esController := newController([]string{"node-1"}) client, esController := newController([]string{"node-1"}, time.Duration(0))
namespace := metav1.NamespaceDefault namespace := metav1.NamespaceDefault
serviceName := "all-the-protocols" serviceName := "all-the-protocols"
ipv6Family := v1.IPv6Protocol ipv6Family := v1.IPv6Protocol
@ -345,7 +347,389 @@ func TestSyncServiceFull(t *testing.T) {
}}, slice.Endpoints) }}, slice.Endpoints)
} }
// TestPodAddsBatching verifies that endpoint updates caused by pod addition are batched together.
// This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now.
// TODO(mborsz): Migrate this test to mock clock when possible.
func TestPodAddsBatching(t *testing.T) {
type podAdd struct {
delay time.Duration
}
tests := []struct {
name string
batchPeriod time.Duration
adds []podAdd
finalDelay time.Duration
wantRequestCount int
}{
{
name: "three adds with no batching",
batchPeriod: 0 * time.Second,
adds: []podAdd{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
},
{
delay: 100 * time.Millisecond,
},
{
delay: 100 * time.Millisecond,
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 3,
},
{
name: "three adds in one batch",
batchPeriod: 1 * time.Second,
adds: []podAdd{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
},
{
delay: 100 * time.Millisecond,
},
{
delay: 100 * time.Millisecond,
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 1,
},
{
name: "three adds in two batches",
batchPeriod: 1 * time.Second,
adds: []podAdd{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
},
{
delay: 100 * time.Millisecond,
},
{
delay: 1 * time.Second,
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 2,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ns := metav1.NamespaceDefault
client, esController := newController([]string{"node-1"}, tc.batchPeriod)
stopCh := make(chan struct{})
defer close(stopCh)
go esController.Run(1, stopCh)
esController.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Port: 80}},
},
})
for i, add := range tc.adds {
time.Sleep(add.delay)
p := newPod(i, ns, true, 0)
esController.podStore.Add(p)
esController.addPod(p)
}
time.Sleep(tc.finalDelay)
assert.Len(t, client.Actions(), tc.wantRequestCount)
// In case of error, make debugging easier.
for _, action := range client.Actions() {
t.Logf("action: %v %v", action.GetVerb(), action.GetResource())
}
})
}
}
// TestPodUpdatesBatching verifies that endpoint updates caused by pod updates are batched together.
// This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now.
// TODO(mborsz): Migrate this test to mock clock when possible.
func TestPodUpdatesBatching(t *testing.T) {
resourceVersion := 1
type podUpdate struct {
delay time.Duration
podName string
podIP string
}
tests := []struct {
name string
batchPeriod time.Duration
podsCount int
updates []podUpdate
finalDelay time.Duration
wantRequestCount int
}{
{
name: "three updates with no batching",
batchPeriod: 0 * time.Second,
podsCount: 10,
updates: []podUpdate{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
podName: "pod0",
podIP: "10.0.0.0",
},
{
delay: 100 * time.Millisecond,
podName: "pod1",
podIP: "10.0.0.1",
},
{
delay: 100 * time.Millisecond,
podName: "pod2",
podIP: "10.0.0.2",
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 3,
},
{
name: "three updates in one batch",
batchPeriod: 1 * time.Second,
podsCount: 10,
updates: []podUpdate{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
podName: "pod0",
podIP: "10.0.0.0",
},
{
delay: 100 * time.Millisecond,
podName: "pod1",
podIP: "10.0.0.1",
},
{
delay: 100 * time.Millisecond,
podName: "pod2",
podIP: "10.0.0.2",
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 1,
},
{
name: "three updates in two batches",
batchPeriod: 1 * time.Second,
podsCount: 10,
updates: []podUpdate{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
podName: "pod0",
podIP: "10.0.0.0",
},
{
delay: 100 * time.Millisecond,
podName: "pod1",
podIP: "10.0.0.1",
},
{
delay: 1 * time.Second,
podName: "pod2",
podIP: "10.0.0.2",
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 2,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ns := metav1.NamespaceDefault
client, esController := newController([]string{"node-1"}, tc.batchPeriod)
stopCh := make(chan struct{})
defer close(stopCh)
go esController.Run(1, stopCh)
addPods(t, esController, ns, tc.podsCount)
esController.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Port: 80}},
},
})
for _, update := range tc.updates {
time.Sleep(update.delay)
old, exists, err := esController.podStore.GetByKey(fmt.Sprintf("%s/%s", ns, update.podName))
if err != nil {
t.Fatalf("Error while retrieving old value of %q: %v", update.podName, err)
}
if !exists {
t.Fatalf("Pod %q doesn't exist", update.podName)
}
oldPod := old.(*v1.Pod)
newPod := oldPod.DeepCopy()
newPod.Status.PodIPs[0].IP = update.podIP
newPod.ResourceVersion = strconv.Itoa(resourceVersion)
resourceVersion++
esController.podStore.Update(newPod)
esController.updatePod(oldPod, newPod)
}
time.Sleep(tc.finalDelay)
assert.Len(t, client.Actions(), tc.wantRequestCount)
// In case of error, make debugging easier.
for _, action := range client.Actions() {
t.Logf("action: %v %v", action.GetVerb(), action.GetResource())
}
})
}
}
// TestPodDeleteBatching verifies that endpoint updates caused by pod deletion are batched together.
// This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now.
// TODO(mborsz): Migrate this test to mock clock when possible.
func TestPodDeleteBatching(t *testing.T) {
type podDelete struct {
delay time.Duration
podName string
}
tests := []struct {
name string
batchPeriod time.Duration
podsCount int
deletes []podDelete
finalDelay time.Duration
wantRequestCount int
}{
{
name: "three deletes with no batching",
batchPeriod: 0 * time.Second,
podsCount: 10,
deletes: []podDelete{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
podName: "pod0",
},
{
delay: 100 * time.Millisecond,
podName: "pod1",
},
{
delay: 100 * time.Millisecond,
podName: "pod2",
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 3,
},
{
name: "three deletes in one batch",
batchPeriod: 1 * time.Second,
podsCount: 10,
deletes: []podDelete{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
podName: "pod0",
},
{
delay: 100 * time.Millisecond,
podName: "pod1",
},
{
delay: 100 * time.Millisecond,
podName: "pod2",
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 1,
},
{
name: "three deletes in two batches",
batchPeriod: 1 * time.Second,
podsCount: 10,
deletes: []podDelete{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
podName: "pod0",
},
{
delay: 100 * time.Millisecond,
podName: "pod1",
},
{
delay: 1 * time.Second,
podName: "pod2",
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 2,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ns := metav1.NamespaceDefault
client, esController := newController([]string{"node-1"}, tc.batchPeriod)
stopCh := make(chan struct{})
defer close(stopCh)
go esController.Run(1, stopCh)
addPods(t, esController, ns, tc.podsCount)
esController.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Port: 80}},
},
})
for _, update := range tc.deletes {
time.Sleep(update.delay)
old, exists, err := esController.podStore.GetByKey(fmt.Sprintf("%s/%s", ns, update.podName))
assert.Nil(t, err, "error while retrieving old value of %q: %v", update.podName, err)
assert.Equal(t, true, exists, "pod should exist")
esController.podStore.Delete(old)
esController.deletePod(old)
}
time.Sleep(tc.finalDelay)
assert.Len(t, client.Actions(), tc.wantRequestCount)
// In case of error, make debugging easier.
for _, action := range client.Actions() {
t.Logf("action: %v %v", action.GetVerb(), action.GetResource())
}
})
}
}
// Test helpers // Test helpers
func addPods(t *testing.T, esController *endpointSliceController, namespace string, podsCount int) {
t.Helper()
for i := 0; i < podsCount; i++ {
pod := newPod(i, namespace, true, 0)
esController.podStore.Add(pod)
}
}
func standardSyncService(t *testing.T, esController *endpointSliceController, namespace, serviceName, managedBySetup string) { func standardSyncService(t *testing.T, esController *endpointSliceController, namespace, serviceName, managedBySetup string) {
t.Helper() t.Helper()

View File

@ -312,6 +312,11 @@ type EndpointSliceControllerConfiguration struct {
// added to an EndpointSlice. More endpoints per slice will result in fewer // added to an EndpointSlice. More endpoints per slice will result in fewer
// and larger endpoint slices, but larger resources. // and larger endpoint slices, but larger resources.
MaxEndpointsPerSlice int32 MaxEndpointsPerSlice int32
// EndpointUpdatesBatchPeriod describes the length of endpoint updates batching period.
// Processing of pod changes will be delayed by this duration to join them with potential
// upcoming updates and reduce the overall number of endpoints updates.
EndpointUpdatesBatchPeriod metav1.Duration
} }
// GarbageCollectorControllerConfiguration contains elements describing GarbageCollectorController. // GarbageCollectorControllerConfiguration contains elements describing GarbageCollectorController.

View File

@ -143,6 +143,7 @@ func (in *EndpointControllerConfiguration) DeepCopy() *EndpointControllerConfigu
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *EndpointSliceControllerConfiguration) DeepCopyInto(out *EndpointSliceControllerConfiguration) { func (in *EndpointSliceControllerConfiguration) DeepCopyInto(out *EndpointSliceControllerConfiguration) {
*out = *in *out = *in
out.EndpointUpdatesBatchPeriod = in.EndpointUpdatesBatchPeriod
return return
} }