Merge pull request #79386 from khenidak/phase2-dualstack

Phase 2 dualstack
This commit is contained in:
Kubernetes Prow Robot
2019-08-28 20:39:56 -07:00
committed by GitHub
61 changed files with 3224 additions and 1151 deletions

View File

@@ -20,6 +20,7 @@ go_library(
"//pkg/apis/core:go_default_library",
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/features:go_default_library",
"//pkg/util/metrics:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
@@ -29,6 +30,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
@@ -39,6 +41,7 @@ go_library(
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/k8s.io/utils/net:go_default_library",
],
)
@@ -54,6 +57,7 @@ go_test(
"//pkg/api/v1/endpoints:go_default_library",
"//pkg/apis/core:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/features:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
@@ -61,11 +65,13 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/util/testing:go_default_library",
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
],
)

View File

@@ -46,6 +46,10 @@ import (
helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/util/metrics"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
utilnet "k8s.io/utils/net"
)
const (
@@ -219,6 +223,37 @@ func (e *EndpointController) addPod(obj interface{}) {
}
}
func podToEndpointAddressForService(svc *v1.Service, pod *v1.Pod) (*v1.EndpointAddress, error) {
if !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) {
return podToEndpointAddress(pod), nil
}
// api-server service controller ensured that the service got the correct IP Family
// according to user setup, here we only need to match EndPoint IPs' family to service
// actual IP family. as in, we don't need to check service.IPFamily
ipv6ClusterIP := utilnet.IsIPv6String(svc.Spec.ClusterIP)
for _, podIP := range pod.Status.PodIPs {
ipv6PodIP := utilnet.IsIPv6String(podIP.IP)
// same family?
// TODO (khenidak) when we remove the max of 2 PodIP limit from pods
// we will have to return multiple endpoint addresses
if ipv6ClusterIP == ipv6PodIP {
return &v1.EndpointAddress{
IP: podIP.IP,
NodeName: &pod.Spec.NodeName,
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Namespace: pod.ObjectMeta.Namespace,
Name: pod.ObjectMeta.Name,
UID: pod.ObjectMeta.UID,
ResourceVersion: pod.ObjectMeta.ResourceVersion,
}}, nil
}
}
return nil, fmt.Errorf("failed to find a matching endpoint for service %v", svc.Name)
}
func podToEndpointAddress(pod *v1.Pod) *v1.EndpointAddress {
return &v1.EndpointAddress{
IP: pod.Status.PodIP,
@@ -245,7 +280,9 @@ func podChanged(oldPod, newPod *v1.Pod) bool {
return true
}
// Convert the pod to an EndpointAddress, clear inert fields,
// and see if they are the same.
// and see if they are the same. Even in a dual stack (multi pod IP) a pod
// will never change just one of its IPs, it will always change all. the below
// comparison to check if a pod has changed will still work
newEndpointAddress := podToEndpointAddress(newPod)
oldEndpointAddress := podToEndpointAddress(oldPod)
// Ignore the ResourceVersion because it changes
@@ -474,7 +511,14 @@ func (e *EndpointController) syncService(key string) error {
continue
}
epa := *podToEndpointAddress(pod)
ep, err := podToEndpointAddressForService(service, pod)
if err != nil {
// this will happen, if the cluster runs with some nodes configured as dual stack and some as not
// such as the case of an upgrade..
klog.V(2).Infof("failed to find endpoint for service:%v with ClusterIP:%v on pod:%v with error:%v", service.Name, service.Spec.ClusterIP, pod.Name, err)
continue
}
epa := *ep
hostname := pod.Spec.Hostname
if len(hostname) > 0 && pod.Spec.Subdomain == service.Name && service.Namespace == pod.Namespace {

View File

@@ -31,15 +31,18 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
utiltesting "k8s.io/client-go/util/testing"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/api/testapi"
endptspkg "k8s.io/kubernetes/pkg/api/v1/endpoints"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/features"
)
var alwaysReady = func() bool { return true }
@@ -49,7 +52,7 @@ var triggerTime = time.Date(2018, 01, 01, 0, 0, 0, 0, time.UTC)
var triggerTimeString = triggerTime.Format(time.RFC3339Nano)
var oldTriggerTimeString = triggerTime.Add(-time.Hour).Format(time.RFC3339Nano)
func testPod(namespace string, id int, nPorts int, isReady bool) *v1.Pod {
func testPod(namespace string, id int, nPorts int, isReady bool, makeDualstack bool) *v1.Pod {
p := &v1.Pod{
TypeMeta: metav1.TypeMeta{APIVersion: "v1"},
ObjectMeta: metav1.ObjectMeta{
@@ -77,14 +80,24 @@ func testPod(namespace string, id int, nPorts int, isReady bool) *v1.Pod {
p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports,
v1.ContainerPort{Name: fmt.Sprintf("port%d", j), ContainerPort: int32(8080 + j)})
}
if makeDualstack {
p.Status.PodIPs = []v1.PodIP{
{
IP: p.Status.PodIP,
},
{
IP: fmt.Sprintf("2000::%d", id),
},
}
}
return p
}
func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotReady int) {
func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotReady int, makeDualstack bool) {
for i := 0; i < nPods+nNotReady; i++ {
isReady := i < nPods
pod := testPod(namespace, i, nPorts, isReady)
pod := testPod(namespace, i, nPorts, isReady, makeDualstack)
store.Add(pod)
}
}
@@ -289,7 +302,7 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) {
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
}},
})
addPods(endpoints.podStore, ns, 1, 1, 0)
addPods(endpoints.podStore, ns, 1, 1, 0, false)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
@@ -333,7 +346,7 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) {
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "UDP"}},
}},
})
addPods(endpoints.podStore, ns, 1, 1, 0)
addPods(endpoints.podStore, ns, 1, 1, 0, false)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
@@ -377,7 +390,7 @@ func TestSyncEndpointsProtocolSCTP(t *testing.T) {
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "SCTP"}},
}},
})
addPods(endpoints.podStore, ns, 1, 1, 0)
addPods(endpoints.podStore, ns, 1, 1, 0, false)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
@@ -418,7 +431,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
},
Subsets: []v1.EndpointSubset{},
})
addPods(endpoints.podStore, ns, 1, 1, 0)
addPods(endpoints.podStore, ns, 1, 1, 0, false)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
@@ -458,7 +471,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) {
},
Subsets: []v1.EndpointSubset{},
})
addPods(endpoints.podStore, ns, 0, 1, 1)
addPods(endpoints.podStore, ns, 0, 1, 1, false)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
@@ -498,7 +511,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) {
},
Subsets: []v1.EndpointSubset{},
})
addPods(endpoints.podStore, ns, 1, 1, 1)
addPods(endpoints.podStore, ns, 1, 1, 1, false)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
@@ -542,7 +555,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) {
Ports: []v1.EndpointPort{{Port: 1000}},
}},
})
addPods(endpoints.podStore, ns, 1, 1, 0)
addPods(endpoints.podStore, ns, 1, 1, 0, false)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
@@ -585,7 +598,7 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) {
Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
}},
})
addPods(endpoints.podStore, metav1.NamespaceDefault, 1, 1, 0)
addPods(endpoints.podStore, metav1.NamespaceDefault, 1, 1, 0, false)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: metav1.NamespaceDefault},
Spec: v1.ServiceSpec{
@@ -602,8 +615,9 @@ func TestSyncEndpointsItems(t *testing.T) {
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL, 0*time.Second)
addPods(endpoints.podStore, ns, 3, 2, 0)
addPods(endpoints.podStore, "blah", 5, 2, 0) // make sure these aren't found!
addPods(endpoints.podStore, ns, 3, 2, 0, false)
addPods(endpoints.podStore, "blah", 5, 2, 0, false) // make sure these aren't found!
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
@@ -646,7 +660,7 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) {
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL, 0*time.Second)
addPods(endpoints.podStore, ns, 3, 2, 0)
addPods(endpoints.podStore, ns, 3, 2, 0, false)
serviceLabels := map[string]string{"foo": "bar"}
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{
@@ -708,7 +722,7 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) {
Ports: []v1.EndpointPort{{Port: 1000}},
}},
})
addPods(endpoints.podStore, ns, 1, 1, 0)
addPods(endpoints.podStore, ns, 1, 1, 0, false)
serviceLabels := map[string]string{"baz": "blah"}
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{
@@ -758,7 +772,8 @@ func TestWaitsForAllInformersToBeSynced2(t *testing.T) {
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL, 0*time.Second)
addPods(endpoints.podStore, ns, 1, 1, 0)
addPods(endpoints.podStore, ns, 1, 1, 0, false)
service := &v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
@@ -809,7 +824,7 @@ func TestSyncEndpointsHeadlessService(t *testing.T) {
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
}},
})
addPods(endpoints.podStore, ns, 1, 1, 0)
addPods(endpoints.podStore, ns, 1, 1, 0, false)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
@@ -967,7 +982,7 @@ func TestSyncEndpointsHeadlessWithoutPort(t *testing.T) {
Ports: nil,
},
})
addPods(endpoints.podStore, ns, 1, 1, 0)
addPods(endpoints.podStore, ns, 1, 1, 0, false)
endpoints.syncService(ns + "/foo")
endpointsHandler.ValidateRequestCount(t, 1)
data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
@@ -1075,11 +1090,146 @@ func TestShouldPodBeInEndpoints(t *testing.T) {
}
}
}
func TestPodToEndpointAddressForService(t *testing.T) {
testCases := []struct {
name string
expectedEndPointIP string
enableDualStack bool
expectError bool
enableDualStackPod bool
service v1.Service
}{
{
name: "v4 service, in a single stack cluster",
expectedEndPointIP: "1.2.3.4",
enableDualStack: false,
expectError: false,
enableDualStackPod: false,
service: v1.Service{
Spec: v1.ServiceSpec{
ClusterIP: "10.0.0.1",
},
},
},
{
name: "v4 service, in a dual stack cluster",
expectedEndPointIP: "1.2.3.4",
enableDualStack: true,
expectError: false,
enableDualStackPod: true,
service: v1.Service{
Spec: v1.ServiceSpec{
ClusterIP: "10.0.0.1",
},
},
},
{
name: "v6 service, in a dual stack cluster. dual stack enabled",
expectedEndPointIP: "2000::0",
enableDualStack: true,
expectError: false,
enableDualStackPod: true,
service: v1.Service{
Spec: v1.ServiceSpec{
ClusterIP: "3000::1",
},
},
},
// in reality this is a misconfigured cluster
// i.e user is not using dual stack and have PodIP == v4 and ServiceIP==v6
// we are testing that we will keep producing the expected behavior
{
name: "v6 service, in a v4 only cluster. dual stack disabled",
expectedEndPointIP: "1.2.3.4",
enableDualStack: false,
expectError: false,
enableDualStackPod: false,
service: v1.Service{
Spec: v1.ServiceSpec{
ClusterIP: "3000::1",
},
},
},
{
name: "v6 service, in a v4 only cluster - dual stack enabled",
expectedEndPointIP: "1.2.3.4",
enableDualStack: true,
expectError: true,
enableDualStackPod: false,
service: v1.Service{
Spec: v1.ServiceSpec{
ClusterIP: "3000::1",
},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.IPv6DualStack, tc.enableDualStack)()
podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
ns := "test"
addPods(podStore, ns, 1, 1, 0, tc.enableDualStackPod)
pods := podStore.List()
if len(pods) != 1 {
t.Fatalf("podStore size: expected: %d, got: %d", 1, len(pods))
}
pod := pods[0].(*v1.Pod)
epa, err := podToEndpointAddressForService(&tc.service, pod)
if err != nil && !tc.expectError {
t.Fatalf("podToEndpointAddressForService returned unexpected error %v", err)
}
if err == nil && tc.expectError {
t.Fatalf("podToEndpointAddressForService should have returned error but it did not")
}
if err != nil && tc.expectError {
return
}
if epa.IP != tc.expectedEndPointIP {
t.Fatalf("IP: expected: %s, got: %s", pod.Status.PodIP, epa.IP)
}
if *(epa.NodeName) != pod.Spec.NodeName {
t.Fatalf("NodeName: expected: %s, got: %s", pod.Spec.NodeName, *(epa.NodeName))
}
if epa.TargetRef.Kind != "Pod" {
t.Fatalf("TargetRef.Kind: expected: %s, got: %s", "Pod", epa.TargetRef.Kind)
}
if epa.TargetRef.Namespace != pod.ObjectMeta.Namespace {
t.Fatalf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Namespace, epa.TargetRef.Namespace)
}
if epa.TargetRef.Name != pod.ObjectMeta.Name {
t.Fatalf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Name, epa.TargetRef.Name)
}
if epa.TargetRef.UID != pod.ObjectMeta.UID {
t.Fatalf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.UID, epa.TargetRef.UID)
}
if epa.TargetRef.ResourceVersion != pod.ObjectMeta.ResourceVersion {
t.Fatalf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.ResourceVersion, epa.TargetRef.ResourceVersion)
}
})
}
}
func TestPodToEndpointAddress(t *testing.T) {
podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
ns := "test"
addPods(podStore, ns, 1, 1, 0)
addPods(podStore, ns, 1, 1, 0, false)
pods := podStore.List()
if len(pods) != 1 {
t.Errorf("podStore size: expected: %d, got: %d", 1, len(pods))
@@ -1113,7 +1263,7 @@ func TestPodToEndpointAddress(t *testing.T) {
func TestPodChanged(t *testing.T) {
podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
ns := "test"
addPods(podStore, ns, 1, 1, 0)
addPods(podStore, ns, 1, 1, 0, false)
pods := podStore.List()
if len(pods) != 1 {
t.Errorf("podStore size: expected: %d, got: %d", 1, len(pods))
@@ -1144,6 +1294,80 @@ func TestPodChanged(t *testing.T) {
}
newPod.Status.PodIP = oldPod.Status.PodIP
/* dual stack tests */
// primary changes, because changing IPs is done by changing sandbox
// case 1: add new secondrary IP
newPod.Status.PodIP = "1.1.3.1"
newPod.Status.PodIPs = []v1.PodIP{
{
IP: "1.1.3.1",
},
{
IP: "2000::1",
},
}
if !podChanged(oldPod, newPod) {
t.Errorf("Expected pod to be changed with adding secondary IP")
}
// reset
newPod.Status.PodIPs = nil
newPod.Status.PodIP = oldPod.Status.PodIP
// case 2: removing a secondary IP
saved := oldPod.Status.PodIP
oldPod.Status.PodIP = "1.1.3.1"
oldPod.Status.PodIPs = []v1.PodIP{
{
IP: "1.1.3.1",
},
{
IP: "2000::1",
},
}
newPod.Status.PodIP = "1.2.3.4"
newPod.Status.PodIPs = []v1.PodIP{
{
IP: "1.2.3.4",
},
}
// reset
oldPod.Status.PodIPs = nil
newPod.Status.PodIPs = nil
oldPod.Status.PodIP = saved
newPod.Status.PodIP = saved
// case 3: change secondary
// case 2: removing a secondary IP
saved = oldPod.Status.PodIP
oldPod.Status.PodIP = "1.1.3.1"
oldPod.Status.PodIPs = []v1.PodIP{
{
IP: "1.1.3.1",
},
{
IP: "2000::1",
},
}
newPod.Status.PodIP = "1.2.3.4"
newPod.Status.PodIPs = []v1.PodIP{
{
IP: "1.2.3.4",
},
{
IP: "2000::2",
},
}
// reset
oldPod.Status.PodIPs = nil
newPod.Status.PodIPs = nil
oldPod.Status.PodIP = saved
newPod.Status.PodIP = saved
/* end dual stack testing */
newPod.ObjectMeta.Name = "wrong-name"
if !podChanged(oldPod, newPod) {
t.Errorf("Expected pod to be changed with pod name change")
@@ -1245,7 +1469,7 @@ func TestLastTriggerChangeTimeAnnotation(t *testing.T) {
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
}},
})
addPods(endpoints.podStore, ns, 1, 1, 0)
addPods(endpoints.podStore, ns, 1, 1, 0, false)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, CreationTimestamp: metav1.NewTime(triggerTime)},
Spec: v1.ServiceSpec{
@@ -1295,7 +1519,7 @@ func TestLastTriggerChangeTimeAnnotation_AnnotationOverridden(t *testing.T) {
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
}},
})
addPods(endpoints.podStore, ns, 1, 1, 0)
addPods(endpoints.podStore, ns, 1, 1, 0, false)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, CreationTimestamp: metav1.NewTime(triggerTime)},
Spec: v1.ServiceSpec{
@@ -1346,7 +1570,7 @@ func TestLastTriggerChangeTimeAnnotation_AnnotationCleared(t *testing.T) {
}},
})
// Neither pod nor service has trigger time, this should cause annotation to be cleared.
addPods(endpoints.podStore, ns, 1, 1, 0)
addPods(endpoints.podStore, ns, 1, 1, 0, false)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
@@ -1485,7 +1709,7 @@ func TestPodUpdatesBatching(t *testing.T) {
go endpoints.Run(1, stopCh)
addPods(endpoints.podStore, ns, tc.podsCount, 1, 0)
addPods(endpoints.podStore, ns, tc.podsCount, 1, 0, false)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
@@ -1618,7 +1842,7 @@ func TestPodAddsBatching(t *testing.T) {
for i, add := range tc.adds {
time.Sleep(add.delay)
p := testPod(ns, i, 1, true)
p := testPod(ns, i, 1, true, false)
endpoints.podStore.Add(p)
endpoints.addPod(p)
}
@@ -1729,7 +1953,7 @@ func TestPodDeleteBatching(t *testing.T) {
go endpoints.Run(1, stopCh)
addPods(endpoints.podStore, ns, tc.podsCount, 1, 0)
addPods(endpoints.podStore, ns, tc.podsCount, 1, 0, false)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},