Merge pull request #13778 from smarterclayton/unready_endpoints

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot
2015-09-17 11:36:17 -07:00
17 changed files with 445 additions and 98 deletions

View File

@@ -319,11 +319,6 @@ func (e *EndpointController) syncService(key string) {
continue
}
if !api.IsPodReady(pod) {
glog.V(5).Infof("Pod is out of service: %v/%v", pod.Namespace, pod.Name)
continue
}
epp := api.EndpointPort{Name: portName, Port: portNum, Protocol: portProto}
epa := api.EndpointAddress{IP: pod.Status.PodIP, TargetRef: &api.ObjectReference{
Kind: "Pod",
@@ -332,7 +327,18 @@ func (e *EndpointController) syncService(key string) {
UID: pod.ObjectMeta.UID,
ResourceVersion: pod.ObjectMeta.ResourceVersion,
}}
subsets = append(subsets, api.EndpointSubset{Addresses: []api.EndpointAddress{epa}, Ports: []api.EndpointPort{epp}})
if api.IsPodReady(pod) {
subsets = append(subsets, api.EndpointSubset{
Addresses: []api.EndpointAddress{epa},
Ports: []api.EndpointPort{epp},
})
} else {
glog.V(5).Infof("Pod is out of service: %v/%v", pod.Namespace, pod.Name)
subsets = append(subsets, api.EndpointSubset{
NotReadyAddresses: []api.EndpointAddress{epa},
Ports: []api.EndpointPort{epp},
})
}
}
}
subsets = endpoints.RepackSubsets(subsets)

View File

@@ -32,8 +32,8 @@ import (
"k8s.io/kubernetes/pkg/util"
)
func addPods(store cache.Store, namespace string, nPods int, nPorts int) {
for i := 0; i < nPods; i++ {
func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotReady int) {
for i := 0; i < nPods+nNotReady; i++ {
p := &api.Pod{
TypeMeta: api.TypeMeta{APIVersion: testapi.Default.Version()},
ObjectMeta: api.ObjectMeta{
@@ -54,6 +54,9 @@ func addPods(store cache.Store, namespace string, nPods int, nPorts int) {
},
},
}
if i >= nPods {
p.Status.Conditions[0].Status = api.ConditionFalse
}
for j := 0; j < nPorts; j++ {
p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports,
api.ContainerPort{Name: fmt.Sprintf("port%d", i), ContainerPort: 8080 + j})
@@ -298,7 +301,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()})
endpoints := NewEndpointController(client)
addPods(endpoints.podStore.Store, ns, 1, 1)
addPods(endpoints.podStore.Store, ns, 1, 1, 0)
endpoints.serviceStore.Store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
Spec: api.ServiceSpec{
@@ -321,6 +324,81 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
}
func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns,
serverResponse{http.StatusOK, &api.Endpoints{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
},
Subsets: []api.EndpointSubset{},
}})
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()})
endpoints := NewEndpointController(client)
addPods(endpoints.podStore.Store, ns, 0, 1, 1)
endpoints.serviceStore.Store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
Spec: api.ServiceSpec{
Selector: map[string]string{},
Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}},
},
})
endpoints.syncService(ns + "/foo")
data := runtime.EncodeOrDie(testapi.Default.Codec(), &api.Endpoints{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
},
Subsets: []api.EndpointSubset{{
NotReadyAddresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}},
}},
})
endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
}
func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns,
serverResponse{http.StatusOK, &api.Endpoints{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
},
Subsets: []api.EndpointSubset{},
}})
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()})
endpoints := NewEndpointController(client)
addPods(endpoints.podStore.Store, ns, 1, 1, 1)
endpoints.serviceStore.Store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
Spec: api.ServiceSpec{
Selector: map[string]string{},
Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}},
},
})
endpoints.syncService(ns + "/foo")
data := runtime.EncodeOrDie(testapi.Default.Codec(), &api.Endpoints{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
NotReadyAddresses: []api.EndpointAddress{{IP: "1.2.3.5", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}}},
Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}},
}},
})
endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
}
func TestSyncEndpointsItemsPreexisting(t *testing.T) {
ns := "bar"
testServer, endpointsHandler := makeTestServer(t, ns,
@@ -338,7 +416,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) {
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()})
endpoints := NewEndpointController(client)
addPods(endpoints.podStore.Store, ns, 1, 1)
addPods(endpoints.podStore.Store, ns, 1, 1, 0)
endpoints.serviceStore.Store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
Spec: api.ServiceSpec{
@@ -378,7 +456,7 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) {
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()})
endpoints := NewEndpointController(client)
addPods(endpoints.podStore.Store, api.NamespaceDefault, 1, 1)
addPods(endpoints.podStore.Store, api.NamespaceDefault, 1, 1, 0)
endpoints.serviceStore.Store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Spec: api.ServiceSpec{
@@ -397,8 +475,8 @@ func TestSyncEndpointsItems(t *testing.T) {
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()})
endpoints := NewEndpointController(client)
addPods(endpoints.podStore.Store, ns, 3, 2)
addPods(endpoints.podStore.Store, "blah", 5, 2) // make sure these aren't found!
addPods(endpoints.podStore.Store, ns, 3, 2, 0)
addPods(endpoints.podStore.Store, "blah", 5, 2, 0) // make sure these aren't found!
endpoints.serviceStore.Store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
Spec: api.ServiceSpec{
@@ -439,7 +517,7 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) {
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()})
endpoints := NewEndpointController(client)
addPods(endpoints.podStore.Store, ns, 3, 2)
addPods(endpoints.podStore.Store, ns, 3, 2, 0)
serviceLabels := map[string]string{"foo": "bar"}
endpoints.serviceStore.Store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{
@@ -499,7 +577,7 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) {
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()})
endpoints := NewEndpointController(client)
addPods(endpoints.podStore.Store, ns, 1, 1)
addPods(endpoints.podStore.Store, ns, 1, 1, 0)
serviceLabels := map[string]string{"baz": "blah"}
endpoints.serviceStore.Store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{