diff --git a/test/integration/master/master_test.go b/test/integration/master/master_test.go index e38f1921df8..264826c28e4 100644 --- a/test/integration/master/master_test.go +++ b/test/integration/master/master_test.go @@ -25,7 +25,9 @@ import ( "io/ioutil" "net" "net/http" + "os" "strings" + "sync" "testing" "time" @@ -34,9 +36,11 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/testapi" + "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/apimachinery/registered" "k8s.io/kubernetes/pkg/apis/batch/v2alpha1" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + clienttypedv1 "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/typed/core/v1" "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/test/integration" @@ -488,3 +492,156 @@ func TestServiceAlloc(t *testing.T) { t.Fatalf("got unexpected error: %v", err) } } + +// TestUpdateNodeObjects represents a simple version of the behavior of node checkins at steady +// state. This test allows for easy profiling of a realistic master scenario for baseline CPU +// in very large clusters. It is disabled by default - start a kube-apiserver and pass +// UPDATE_NODE_APISERVER as the host value. +func TestUpdateNodeObjects(t *testing.T) { + server := os.Getenv("UPDATE_NODE_APISERVER") + if len(server) == 0 { + t.Skip("UPDATE_NODE_APISERVER is not set") + } + c := clienttypedv1.NewForConfigOrDie(&restclient.Config{ + QPS: 10000, + Host: server, + ContentConfig: restclient.ContentConfig{ + AcceptContentTypes: "application/vnd.kubernetes.protobuf", + ContentType: "application/vnd.kubernetes.protobuf", + }, + }) + + nodes := 400 + listers := 5 + watchers := 50 + iterations := 10000 + + for i := 0; i < nodes*6; i++ { + c.Nodes().Delete(fmt.Sprintf("node-%d", i), nil) + _, err := c.Nodes().Create(&v1.Node{ + ObjectMeta: v1.ObjectMeta{ + Name: fmt.Sprintf("node-%d", i), + }, + }) + if err != nil { + t.Fatal(err) + } + } + + for k := 0; k < listers; k++ { + go func(lister int) { + for i := 0; i < iterations; i++ { + _, err := c.Nodes().List(v1.ListOptions{}) + if err != nil { + fmt.Printf("[list:%d] error after %d: %v\n", lister, i, err) + break + } + time.Sleep(time.Duration(lister)*10*time.Millisecond + 1500*time.Millisecond) + } + }(k) + } + + for k := 0; k < watchers; k++ { + go func(lister int) { + w, err := c.Nodes().Watch(v1.ListOptions{}) + if err != nil { + fmt.Printf("[watch:%d] error: %v", k, err) + return + } + i := 0 + for r := range w.ResultChan() { + i++ + if _, ok := r.Object.(*v1.Node); !ok { + fmt.Printf("[watch:%d] unexpected object after %d: %#v\n", lister, i, r) + } + if i%100 == 0 { + fmt.Printf("[watch:%d] iteration %d ...\n", lister, i) + } + } + fmt.Printf("[watch:%d] done\n", lister) + }(k) + } + + var wg sync.WaitGroup + wg.Add(nodes - listers) + + for j := 0; j < nodes; j++ { + go func(node int) { + var lastCount int + for i := 0; i < iterations; i++ { + if i%100 == 0 { + fmt.Printf("[%d] iteration %d ...\n", node, i) + } + if i%20 == 0 { + _, err := c.Nodes().List(v1.ListOptions{}) + if err != nil { + fmt.Printf("[%d] error after %d: %v\n", node, i, err) + break + } + } + + r, err := c.Nodes().List(v1.ListOptions{ + FieldSelector: fmt.Sprintf("metadata.name=node-%d", node), + ResourceVersion: "0", + }) + if err != nil { + fmt.Printf("[%d] error after %d: %v\n", node, i, err) + break + } + if len(r.Items) != 1 { + fmt.Printf("[%d] error after %d: unexpected list count\n", node, i) + break + } + + n, err := c.Nodes().Get(fmt.Sprintf("node-%d", node)) + if err != nil { + fmt.Printf("[%d] error after %d: %v\n", node, i, err) + break + } + if len(n.Status.Conditions) != lastCount { + fmt.Printf("[%d] worker set %d, read %d conditions\n", node, lastCount, len(n.Status.Conditions)) + break + } + previousCount := lastCount + switch { + case i%4 == 0: + lastCount = 1 + n.Status.Conditions = []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionTrue, + Reason: "foo", + }, + } + case i%4 == 1: + lastCount = 2 + n.Status.Conditions = []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionFalse, + Reason: "foo", + }, + { + Type: v1.NodeDiskPressure, + Status: v1.ConditionTrue, + Reason: "bar", + }, + } + case i%4 == 1: + lastCount = 0 + n.Status.Conditions = nil + } + if _, err := c.Nodes().UpdateStatus(n); err != nil { + if !errors.IsConflict(err) { + fmt.Printf("[%d] error after %d: %v\n", node, i, err) + break + } + lastCount = previousCount + } + } + wg.Done() + fmt.Printf("[%d] done\n", node) + }(j) + } + wg.Wait() +}