Move Patch, AtomicPut and MasterService tests to test/integration.

This commit is contained in:
Wojciech Tyczynski 2016-06-16 13:28:30 +02:00
parent b0816b19d3
commit fe14beb980
3 changed files with 364 additions and 336 deletions

View File

@ -23,7 +23,6 @@ import (
"net/http"
"net/http/httptest"
"os"
"reflect"
gruntime "runtime"
"strconv"
"strings"
@ -34,8 +33,6 @@ import (
"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/client/restclient"
@ -49,9 +46,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/master"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flag"
"k8s.io/kubernetes/pkg/util/flowcontrol"
@ -280,14 +275,6 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string
return apiServer.URL, configFilePath
}
func countEndpoints(eps *api.Endpoints) int {
count := 0
for i := range eps.Subsets {
count += len(eps.Subsets[i].Addresses) * len(eps.Subsets[i].Ports)
}
return count
}
func podRunning(c *client.Client, podNamespace string, podName string) wait.ConditionFunc {
return func() (bool, error) {
pod, err := c.Pods(podNamespace).Get(podName)
@ -308,322 +295,6 @@ func podRunning(c *client.Client, podNamespace string, podName string) wait.Cond
}
}
func runAtomicPutTest(c *client.Client) {
svcBody := api.Service{
TypeMeta: unversioned.TypeMeta{
APIVersion: c.APIVersion().String(),
},
ObjectMeta: api.ObjectMeta{
Name: "atomicservice",
Labels: map[string]string{
"name": "atomicService",
},
},
Spec: api.ServiceSpec{
// This is here because validation requires it.
Selector: map[string]string{
"foo": "bar",
},
Ports: []api.ServicePort{{
Port: 12345,
Protocol: "TCP",
}},
SessionAffinity: "None",
},
}
services := c.Services(api.NamespaceDefault)
svc, err := services.Create(&svcBody)
if err != nil {
glog.Fatalf("Failed creating atomicService: %v", err)
}
glog.Info("Created atomicService")
testLabels := labels.Set{
"foo": "bar",
}
for i := 0; i < 5; i++ {
// a: z, b: y, etc...
testLabels[string([]byte{byte('a' + i)})] = string([]byte{byte('z' - i)})
}
var wg sync.WaitGroup
wg.Add(len(testLabels))
for label, value := range testLabels {
go func(l, v string) {
for {
glog.Infof("Starting to update (%s, %s)", l, v)
tmpSvc, err := services.Get(svc.Name)
if err != nil {
glog.Errorf("Error getting atomicService: %v", err)
continue
}
if tmpSvc.Spec.Selector == nil {
tmpSvc.Spec.Selector = map[string]string{l: v}
} else {
tmpSvc.Spec.Selector[l] = v
}
glog.Infof("Posting update (%s, %s)", l, v)
tmpSvc, err = services.Update(tmpSvc)
if err != nil {
if apierrors.IsConflict(err) {
glog.Infof("Conflict: (%s, %s)", l, v)
// This is what we expect.
continue
}
glog.Errorf("Unexpected error putting atomicService: %v", err)
continue
}
break
}
glog.Infof("Done update (%s, %s)", l, v)
wg.Done()
}(label, value)
}
wg.Wait()
svc, err = services.Get(svc.Name)
if err != nil {
glog.Fatalf("Failed getting atomicService after writers are complete: %v", err)
}
if !reflect.DeepEqual(testLabels, labels.Set(svc.Spec.Selector)) {
glog.Fatalf("Selector PUTs were not atomic: wanted %v, got %v", testLabels, svc.Spec.Selector)
}
glog.Info("Atomic PUTs work.")
}
func runPatchTest(c *client.Client) {
name := "patchservice"
resource := "services"
svcBody := api.Service{
TypeMeta: unversioned.TypeMeta{
APIVersion: c.APIVersion().String(),
},
ObjectMeta: api.ObjectMeta{
Name: name,
Labels: map[string]string{},
},
Spec: api.ServiceSpec{
// This is here because validation requires it.
Selector: map[string]string{
"foo": "bar",
},
Ports: []api.ServicePort{{
Port: 12345,
Protocol: "TCP",
}},
SessionAffinity: "None",
},
}
services := c.Services(api.NamespaceDefault)
svc, err := services.Create(&svcBody)
if err != nil {
glog.Fatalf("Failed creating patchservice: %v", err)
}
patchBodies := map[unversioned.GroupVersion]map[api.PatchType]struct {
AddLabelBody []byte
RemoveLabelBody []byte
RemoveAllLabelsBody []byte
}{
v1.SchemeGroupVersion: {
api.JSONPatchType: {
[]byte(`[{"op":"add","path":"/metadata/labels","value":{"foo":"bar","baz":"qux"}}]`),
[]byte(`[{"op":"remove","path":"/metadata/labels/foo"}]`),
[]byte(`[{"op":"remove","path":"/metadata/labels"}]`),
},
api.MergePatchType: {
[]byte(`{"metadata":{"labels":{"foo":"bar","baz":"qux"}}}`),
[]byte(`{"metadata":{"labels":{"foo":null}}}`),
[]byte(`{"metadata":{"labels":null}}`),
},
api.StrategicMergePatchType: {
[]byte(`{"metadata":{"labels":{"foo":"bar","baz":"qux"}}}`),
[]byte(`{"metadata":{"labels":{"foo":null}}}`),
[]byte(`{"metadata":{"labels":{"$patch":"replace"}}}`),
},
},
}
pb := patchBodies[c.APIVersion()]
execPatch := func(pt api.PatchType, body []byte) error {
return c.Patch(pt).
Resource(resource).
Namespace(api.NamespaceDefault).
Name(name).
Body(body).
Do().
Error()
}
for k, v := range pb {
// add label
err := execPatch(k, v.AddLabelBody)
if err != nil {
glog.Fatalf("Failed updating patchservice with patch type %s: %v", k, err)
}
svc, err = services.Get(name)
if err != nil {
glog.Fatalf("Failed getting patchservice: %v", err)
}
if len(svc.Labels) != 2 || svc.Labels["foo"] != "bar" || svc.Labels["baz"] != "qux" {
glog.Fatalf("Failed updating patchservice with patch type %s: labels are: %v", k, svc.Labels)
}
// remove one label
err = execPatch(k, v.RemoveLabelBody)
if err != nil {
glog.Fatalf("Failed updating patchservice with patch type %s: %v", k, err)
}
svc, err = services.Get(name)
if err != nil {
glog.Fatalf("Failed getting patchservice: %v", err)
}
if len(svc.Labels) != 1 || svc.Labels["baz"] != "qux" {
glog.Fatalf("Failed updating patchservice with patch type %s: labels are: %v", k, svc.Labels)
}
// remove all labels
err = execPatch(k, v.RemoveAllLabelsBody)
if err != nil {
glog.Fatalf("Failed updating patchservice with patch type %s: %v", k, err)
}
svc, err = services.Get(name)
if err != nil {
glog.Fatalf("Failed getting patchservice: %v", err)
}
if svc.Labels != nil {
glog.Fatalf("Failed remove all labels from patchservice with patch type %s: %v", k, svc.Labels)
}
}
// Test patch with a resource that allows create on update
endpointTemplate := &api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "patchendpoint"},
Subsets: []api.EndpointSubset{
{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []api.EndpointPort{{Port: 80, Protocol: api.ProtocolTCP}},
},
},
}
patchEndpoint := func(json []byte) (runtime.Object, error) {
return c.Patch(api.MergePatchType).Resource("endpoints").Namespace(api.NamespaceDefault).Name("patchendpoint").Body(json).Do().Get()
}
// Make sure patch doesn't get to CreateOnUpdate
{
endpointJSON, err := runtime.Encode(api.Codecs.LegacyCodec(v1.SchemeGroupVersion), endpointTemplate)
if err != nil {
glog.Fatalf("Failed creating endpoint JSON: %v", err)
}
if obj, err := patchEndpoint(endpointJSON); !apierrors.IsNotFound(err) {
glog.Fatalf("Expected notfound creating from patch, got error=%v and object: %#v", err, obj)
}
}
// Create the endpoint (endpoints set AllowCreateOnUpdate=true) to get a UID and resource version
createdEndpoint, err := c.Endpoints(api.NamespaceDefault).Update(endpointTemplate)
if err != nil {
glog.Fatalf("Failed creating endpoint: %v", err)
}
// Make sure identity patch is accepted
{
endpointJSON, err := runtime.Encode(api.Codecs.LegacyCodec(v1.SchemeGroupVersion), createdEndpoint)
if err != nil {
glog.Fatalf("Failed creating endpoint JSON: %v", err)
}
if _, err := patchEndpoint(endpointJSON); err != nil {
glog.Fatalf("Failed patching endpoint: %v", err)
}
}
// Make sure patch complains about a mismatched resourceVersion
{
endpointTemplate.Name = ""
endpointTemplate.UID = ""
endpointTemplate.ResourceVersion = "1"
endpointJSON, err := runtime.Encode(api.Codecs.LegacyCodec(v1.SchemeGroupVersion), endpointTemplate)
if err != nil {
glog.Fatalf("Failed creating endpoint JSON: %v", err)
}
if _, err := patchEndpoint(endpointJSON); !apierrors.IsConflict(err) {
glog.Fatalf("Expected error, got %#v", err)
}
}
// Make sure patch complains about mutating the UID
{
endpointTemplate.Name = ""
endpointTemplate.UID = "abc"
endpointTemplate.ResourceVersion = ""
endpointJSON, err := runtime.Encode(api.Codecs.LegacyCodec(v1.SchemeGroupVersion), endpointTemplate)
if err != nil {
glog.Fatalf("Failed creating endpoint JSON: %v", err)
}
if _, err := patchEndpoint(endpointJSON); !apierrors.IsInvalid(err) {
glog.Fatalf("Expected error, got %#v", err)
}
}
// Make sure patch complains about a mismatched name
{
endpointTemplate.Name = "changedname"
endpointTemplate.UID = ""
endpointTemplate.ResourceVersion = ""
endpointJSON, err := runtime.Encode(api.Codecs.LegacyCodec(v1.SchemeGroupVersion), endpointTemplate)
if err != nil {
glog.Fatalf("Failed creating endpoint JSON: %v", err)
}
if _, err := patchEndpoint(endpointJSON); !apierrors.IsBadRequest(err) {
glog.Fatalf("Expected error, got %#v", err)
}
}
// Make sure patch containing originally submitted JSON is accepted
{
endpointTemplate.Name = ""
endpointTemplate.UID = ""
endpointTemplate.ResourceVersion = ""
endpointJSON, err := runtime.Encode(api.Codecs.LegacyCodec(v1.SchemeGroupVersion), endpointTemplate)
if err != nil {
glog.Fatalf("Failed creating endpoint JSON: %v", err)
}
if _, err := patchEndpoint(endpointJSON); err != nil {
glog.Fatalf("Failed patching endpoint: %v", err)
}
}
glog.Info("PATCHs work.")
}
func runMasterServiceTest(client *client.Client) {
time.Sleep(12 * time.Second)
svcList, err := client.Services(api.NamespaceDefault).List(api.ListOptions{})
if err != nil {
glog.Fatalf("Unexpected error listing services: %v", err)
}
var foundRW bool
found := sets.String{}
for i := range svcList.Items {
found.Insert(svcList.Items[i].Name)
if svcList.Items[i].Name == "kubernetes" {
foundRW = true
}
}
if foundRW {
ep, err := client.Endpoints(api.NamespaceDefault).Get("kubernetes")
if err != nil {
glog.Fatalf("Unexpected error listing endpoints for kubernetes service: %v", err)
}
if countEndpoints(ep) == 0 {
glog.Fatalf("No endpoints for kubernetes service: %v", ep)
}
} else {
glog.Errorf("No RW service found: %v", found)
glog.Fatal("Kubernetes service test failed")
}
glog.Infof("Master service test passed.")
}
func runSchedulerNoPhantomPodsTest(client *client.Client) {
pod := &api.Pod{
Spec: api.PodSpec{
@ -737,11 +408,7 @@ func main() {
})
// Run tests in parallel
testFuncs := []testFunc{
runAtomicPutTest,
runPatchTest,
runMasterServiceTest,
}
testFuncs := []testFunc{}
// Only run at most maxConcurrency tests in parallel.
if maxConcurrency <= 0 {

View File

@ -22,17 +22,20 @@ import (
"fmt"
"log"
"reflect"
"runtime"
rt "runtime"
"sync"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/restclient"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/watch"
@ -110,6 +113,307 @@ func TestClient(t *testing.T) {
}
}
func TestAtomicPut(t *testing.T) {
_, s := framework.RunAMaster(t)
defer s.Close()
framework.DeleteAllEtcdKeys()
c := client.NewOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
rcBody := api.ReplicationController{
TypeMeta: unversioned.TypeMeta{
APIVersion: c.APIVersion().String(),
},
ObjectMeta: api.ObjectMeta{
Name: "atomicrc",
Labels: map[string]string{
"name": "atomicrc",
},
},
Spec: api.ReplicationControllerSpec{
Replicas: 0,
Selector: map[string]string{
"foo": "bar",
},
Template: &api.PodTemplateSpec{
ObjectMeta: api.ObjectMeta{
Labels: map[string]string{
"foo": "bar",
},
},
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "name", Image: "image"},
},
},
},
},
}
rcs := c.ReplicationControllers(api.NamespaceDefault)
rc, err := rcs.Create(&rcBody)
if err != nil {
t.Fatalf("Failed creating atomicRC: %v", err)
}
testLabels := labels.Set{
"foo": "bar",
}
for i := 0; i < 5; i++ {
// a: z, b: y, etc...
testLabels[string([]byte{byte('a' + i)})] = string([]byte{byte('z' - i)})
}
var wg sync.WaitGroup
wg.Add(len(testLabels))
for label, value := range testLabels {
go func(l, v string) {
defer wg.Done()
for {
tmpRC, err := rcs.Get(rc.Name)
if err != nil {
t.Errorf("Error getting atomicRC: %v", err)
continue
}
if tmpRC.Spec.Selector == nil {
tmpRC.Spec.Selector = map[string]string{l: v}
tmpRC.Spec.Template.Labels = map[string]string{l: v}
} else {
tmpRC.Spec.Selector[l] = v
tmpRC.Spec.Template.Labels[l] = v
}
tmpRC, err = rcs.Update(tmpRC)
if err != nil {
if apierrors.IsConflict(err) {
// This is what we expect.
continue
}
t.Errorf("Unexpected error putting atomicRC: %v", err)
continue
}
return
}
}(label, value)
}
wg.Wait()
rc, err = rcs.Get(rc.Name)
if err != nil {
t.Fatalf("Failed getting atomicRC after writers are complete: %v", err)
}
if !reflect.DeepEqual(testLabels, labels.Set(rc.Spec.Selector)) {
t.Errorf("Selector PUTs were not atomic: wanted %v, got %v", testLabels, rc.Spec.Selector)
}
}
func TestPatch(t *testing.T) {
_, s := framework.RunAMaster(t)
defer s.Close()
framework.DeleteAllEtcdKeys()
c := client.NewOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
name := "patchpod"
resource := "pods"
podBody := api.Pod{
TypeMeta: unversioned.TypeMeta{
APIVersion: c.APIVersion().String(),
},
ObjectMeta: api.ObjectMeta{
Name: name,
Labels: map[string]string{},
},
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "name", Image: "image"},
},
},
}
pods := c.Pods(api.NamespaceDefault)
pod, err := pods.Create(&podBody)
if err != nil {
t.Fatalf("Failed creating patchpods: %v", err)
}
patchBodies := map[unversioned.GroupVersion]map[api.PatchType]struct {
AddLabelBody []byte
RemoveLabelBody []byte
RemoveAllLabelsBody []byte
}{
v1.SchemeGroupVersion: {
api.JSONPatchType: {
[]byte(`[{"op":"add","path":"/metadata/labels","value":{"foo":"bar","baz":"qux"}}]`),
[]byte(`[{"op":"remove","path":"/metadata/labels/foo"}]`),
[]byte(`[{"op":"remove","path":"/metadata/labels"}]`),
},
api.MergePatchType: {
[]byte(`{"metadata":{"labels":{"foo":"bar","baz":"qux"}}}`),
[]byte(`{"metadata":{"labels":{"foo":null}}}`),
[]byte(`{"metadata":{"labels":null}}`),
},
api.StrategicMergePatchType: {
[]byte(`{"metadata":{"labels":{"foo":"bar","baz":"qux"}}}`),
[]byte(`{"metadata":{"labels":{"foo":null}}}`),
[]byte(`{"metadata":{"labels":{"$patch":"replace"}}}`),
},
},
}
pb := patchBodies[c.APIVersion()]
execPatch := func(pt api.PatchType, body []byte) error {
return c.Patch(pt).
Resource(resource).
Namespace(api.NamespaceDefault).
Name(name).
Body(body).
Do().
Error()
}
for k, v := range pb {
// add label
err := execPatch(k, v.AddLabelBody)
if err != nil {
t.Fatalf("Failed updating patchpod with patch type %s: %v", k, err)
}
pod, err = pods.Get(name)
if err != nil {
t.Fatalf("Failed getting patchpod: %v", err)
}
if len(pod.Labels) != 2 || pod.Labels["foo"] != "bar" || pod.Labels["baz"] != "qux" {
t.Errorf("Failed updating patchpod with patch type %s: labels are: %v", k, pod.Labels)
}
// remove one label
err = execPatch(k, v.RemoveLabelBody)
if err != nil {
t.Fatalf("Failed updating patchpod with patch type %s: %v", k, err)
}
pod, err = pods.Get(name)
if err != nil {
t.Fatalf("Failed getting patchpod: %v", err)
}
if len(pod.Labels) != 1 || pod.Labels["baz"] != "qux" {
t.Errorf("Failed updating patchpod with patch type %s: labels are: %v", k, pod.Labels)
}
// remove all labels
err = execPatch(k, v.RemoveAllLabelsBody)
if err != nil {
t.Fatalf("Failed updating patchpod with patch type %s: %v", k, err)
}
pod, err = pods.Get(name)
if err != nil {
t.Fatalf("Failed getting patchpod: %v", err)
}
if pod.Labels != nil {
t.Errorf("Failed remove all labels from patchpod with patch type %s: %v", k, pod.Labels)
}
}
}
func TestPatchWithCreateOnUpdate(t *testing.T) {
_, s := framework.RunAMaster(t)
defer s.Close()
framework.DeleteAllEtcdKeys()
c := client.NewOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpointTemplate := &api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "patchendpoint"},
Subsets: []api.EndpointSubset{
{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []api.EndpointPort{{Port: 80, Protocol: api.ProtocolTCP}},
},
},
}
patchEndpoint := func(json []byte) (runtime.Object, error) {
return c.Patch(api.MergePatchType).Resource("endpoints").Namespace(api.NamespaceDefault).Name("patchendpoint").Body(json).Do().Get()
}
// Make sure patch doesn't get to CreateOnUpdate
{
endpointJSON, err := runtime.Encode(api.Codecs.LegacyCodec(v1.SchemeGroupVersion), endpointTemplate)
if err != nil {
t.Fatalf("Failed creating endpoint JSON: %v", err)
}
if obj, err := patchEndpoint(endpointJSON); !apierrors.IsNotFound(err) {
t.Errorf("Expected notfound creating from patch, got error=%v and object: %#v", err, obj)
}
}
// Create the endpoint (endpoints set AllowCreateOnUpdate=true) to get a UID and resource version
createdEndpoint, err := c.Endpoints(api.NamespaceDefault).Update(endpointTemplate)
if err != nil {
t.Fatalf("Failed creating endpoint: %v", err)
}
// Make sure identity patch is accepted
{
endpointJSON, err := runtime.Encode(api.Codecs.LegacyCodec(v1.SchemeGroupVersion), createdEndpoint)
if err != nil {
t.Fatalf("Failed creating endpoint JSON: %v", err)
}
if _, err := patchEndpoint(endpointJSON); err != nil {
t.Errorf("Failed patching endpoint: %v", err)
}
}
// Make sure patch complains about a mismatched resourceVersion
{
endpointTemplate.Name = ""
endpointTemplate.UID = ""
endpointTemplate.ResourceVersion = "1"
endpointJSON, err := runtime.Encode(api.Codecs.LegacyCodec(v1.SchemeGroupVersion), endpointTemplate)
if err != nil {
t.Fatalf("Failed creating endpoint JSON: %v", err)
}
if _, err := patchEndpoint(endpointJSON); !apierrors.IsConflict(err) {
t.Errorf("Expected error, got %#v", err)
}
}
// Make sure patch complains about mutating the UID
{
endpointTemplate.Name = ""
endpointTemplate.UID = "abc"
endpointTemplate.ResourceVersion = ""
endpointJSON, err := runtime.Encode(api.Codecs.LegacyCodec(v1.SchemeGroupVersion), endpointTemplate)
if err != nil {
t.Fatalf("Failed creating endpoint JSON: %v", err)
}
if _, err := patchEndpoint(endpointJSON); !apierrors.IsInvalid(err) {
t.Errorf("Expected error, got %#v", err)
}
}
// Make sure patch complains about a mismatched name
{
endpointTemplate.Name = "changedname"
endpointTemplate.UID = ""
endpointTemplate.ResourceVersion = ""
endpointJSON, err := runtime.Encode(api.Codecs.LegacyCodec(v1.SchemeGroupVersion), endpointTemplate)
if err != nil {
t.Fatalf("Failed creating endpoint JSON: %v", err)
}
if _, err := patchEndpoint(endpointJSON); !apierrors.IsBadRequest(err) {
t.Errorf("Expected error, got %#v", err)
}
}
// Make sure patch containing originally submitted JSON is accepted
{
endpointTemplate.Name = ""
endpointTemplate.UID = ""
endpointTemplate.ResourceVersion = ""
endpointJSON, err := runtime.Encode(api.Codecs.LegacyCodec(v1.SchemeGroupVersion), endpointTemplate)
if err != nil {
t.Fatalf("Failed creating endpoint JSON: %v", err)
}
if _, err := patchEndpoint(endpointJSON); err != nil {
t.Errorf("Failed patching endpoint: %v", err)
}
}
}
func TestAPIVersions(t *testing.T) {
_, s := framework.RunAMaster(t)
defer s.Close()
@ -215,7 +519,7 @@ func TestMultiWatch(t *testing.T) {
// TODO: Reenable this test when we get #6059 resolved.
return
const watcherCount = 50
runtime.GOMAXPROCS(watcherCount)
rt.GOMAXPROCS(watcherCount)
framework.DeleteAllEtcdKeys()
defer framework.DeleteAllEtcdKeys()

View File

@ -21,15 +21,22 @@ package integration
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/ghodss/yaml"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/client/restclient"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/master"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/test/integration/framework"
)
@ -371,3 +378,53 @@ func TestAccept(t *testing.T) {
t.Errorf("unexpected error from the server")
}
}
func countEndpoints(eps *api.Endpoints) int {
count := 0
for i := range eps.Subsets {
count += len(eps.Subsets[i].Addresses) * len(eps.Subsets[i].Ports)
}
return count
}
func TestMasterService(t *testing.T) {
m, err := master.New(framework.NewIntegrationTestMasterConfig())
if err != nil {
t.Fatalf("Error in bringing up the master: %v", err)
}
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
m.Handler.ServeHTTP(w, req)
}))
defer s.Close()
framework.DeleteAllEtcdKeys()
client := client.NewOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
err = wait.Poll(time.Second, time.Minute, func() (bool, error) {
svcList, err := client.Services(api.NamespaceDefault).List(api.ListOptions{})
if err != nil {
t.Errorf("unexpected error: %v", err)
return false, nil
}
found := false
for i := range svcList.Items {
if svcList.Items[i].Name == "kubernetes" {
found = true
}
}
if found {
ep, err := client.Endpoints(api.NamespaceDefault).Get("kubernetes")
if err != nil {
return false, nil
}
if countEndpoints(ep) == 0 {
return false, fmt.Errorf("no endpoints for kubernetes service: %v", ep)
}
return true, nil
}
return false, nil
})
if err != nil {
t.Errorf("unexpected error: %v", err)
}
}