From 690df95955bcd1bf9591bced076b45f0c4d09a1b Mon Sep 17 00:00:00 2001 From: Maisem Ali Date: Wed, 5 Jul 2017 12:40:35 -0700 Subject: [PATCH 1/2] Adding cassandra test server manifests. --- .../statefulset/cassandra/pdb.yaml | 11 +++ .../statefulset/cassandra/service.yaml | 12 +++ .../statefulset/cassandra/statefulset.yaml | 86 +++++++++++++++++++ .../statefulset/cassandra/tester.yaml | 48 +++++++++++ 4 files changed, 157 insertions(+) create mode 100644 test/e2e/testing-manifests/statefulset/cassandra/pdb.yaml create mode 100644 test/e2e/testing-manifests/statefulset/cassandra/service.yaml create mode 100644 test/e2e/testing-manifests/statefulset/cassandra/statefulset.yaml create mode 100644 test/e2e/testing-manifests/statefulset/cassandra/tester.yaml diff --git a/test/e2e/testing-manifests/statefulset/cassandra/pdb.yaml b/test/e2e/testing-manifests/statefulset/cassandra/pdb.yaml new file mode 100644 index 00000000000..9944d2b387f --- /dev/null +++ b/test/e2e/testing-manifests/statefulset/cassandra/pdb.yaml @@ -0,0 +1,11 @@ +apiVersion: policy/v1beta1 +kind: PodDisruptionBudget +metadata: + name: cassandra-pdb + labels: + pdb: cassandra +spec: + minAvailable: 2 + selector: + matchLabels: + app: cassandra diff --git a/test/e2e/testing-manifests/statefulset/cassandra/service.yaml b/test/e2e/testing-manifests/statefulset/cassandra/service.yaml new file mode 100644 index 00000000000..35b07733b5c --- /dev/null +++ b/test/e2e/testing-manifests/statefulset/cassandra/service.yaml @@ -0,0 +1,12 @@ +apiVersion: v1 +kind: Service +metadata: + labels: + app: cassandra + name: cassandra +spec: + clusterIP: None + ports: + - port: 9042 + selector: + app: cassandra diff --git a/test/e2e/testing-manifests/statefulset/cassandra/statefulset.yaml b/test/e2e/testing-manifests/statefulset/cassandra/statefulset.yaml new file mode 100644 index 00000000000..269d0a18f63 --- /dev/null +++ b/test/e2e/testing-manifests/statefulset/cassandra/statefulset.yaml @@ -0,0 +1,86 @@ +apiVersion: "apps/v1beta1" +kind: StatefulSet +metadata: + name: cassandra +spec: + serviceName: cassandra + replicas: 3 + template: + metadata: + labels: + app: cassandra + spec: + containers: + - name: cassandra + image: gcr.io/google-samples/cassandra:v12 + imagePullPolicy: Always + ports: + - containerPort: 7000 + name: intra-node + - containerPort: 7001 + name: tls-intra-node + - containerPort: 7199 + name: jmx + - containerPort: 9042 + name: cql + resources: + requests: + cpu: "300m" + memory: 1Gi + securityContext: + capabilities: + add: + - IPC_LOCK + lifecycle: + preStop: + exec: + command: ["/bin/sh", "-c", "PID=$(pidof java) && kill $PID && while ps -p $PID > /dev/null; do sleep 1; done"] + env: + - name: MAX_HEAP_SIZE + value: 512M + - name: HEAP_NEWSIZE + value: 100M + - name: POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: CASSANDRA_SEEDS + value: "cassandra-0.cassandra.$(POD_NAMESPACE).svc.cluster.local" + - name: CASSANDRA_CLUSTER_NAME + value: "K8Demo" + - name: CASSANDRA_DC + value: "DC1-K8Demo" + - name: CASSANDRA_RACK + value: "Rack1-K8Demo" + - name: CASSANDRA_AUTO_BOOTSTRAP + value: "false" + - name: POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + readinessProbe: + exec: + command: + - /bin/bash + - -c + - /ready-probe.sh + initialDelaySeconds: 15 + timeoutSeconds: 5 + # These volume mounts are persistent. They are like inline claims, + # but not exactly because the names need to match exactly one of + # the stateful pod volumes. + volumeMounts: + - name: cassandra-data + mountPath: /cassandra_data + # These are converted to volume claims by the controller + # and mounted at the paths mentioned above. + # do not use these in production until ssd GCEPersistentDisk or other ssd pd + volumeClaimTemplates: + - metadata: + name: cassandra-data + spec: + accessModes: [ "ReadWriteOnce" ] + resources: + requests: + storage: 1Gi + diff --git a/test/e2e/testing-manifests/statefulset/cassandra/tester.yaml b/test/e2e/testing-manifests/statefulset/cassandra/tester.yaml new file mode 100644 index 00000000000..65699f8e3cf --- /dev/null +++ b/test/e2e/testing-manifests/statefulset/cassandra/tester.yaml @@ -0,0 +1,48 @@ +apiVersion: apps/v1beta1 +kind: Deployment +metadata: + name: cassandra-test-server +spec: + replicas: 3 + template: + metadata: + labels: + app: test-server + spec: + containers: + - name: test-server + image: gcr.io/google-containers/cassandra-e2e-test:0.1 + imagePullPolicy: Always + ports: + - containerPort: 8080 + readinessProbe: + httpGet: + path: /healthz + port: 8080 + initialDelaySeconds: 2 + periodSeconds: 2 +--- +apiVersion: policy/v1beta1 +kind: PodDisruptionBudget +metadata: + name: tester-pdb + labels: + pdb: test-server +spec: + minAvailable: 1 + selector: + matchLabels: + app: test-server +--- +apiVersion: v1 +kind: Service +metadata: + labels: + app: test-server + name: test-server +spec: + ports: + - port: 8080 + selector: + app: test-server + type: LoadBalancer From 3ea11b2ba54cbcfb5be0241ec3e4f7ae995b3dab Mon Sep 17 00:00:00 2001 From: Maisem Ali Date: Wed, 5 Jul 2017 12:41:13 -0700 Subject: [PATCH 2/2] Adding cassandra test. --- test/e2e/upgrades/BUILD | 1 + test/e2e/upgrades/cassandra.go | 216 +++++++++++++++++++++++++++++++++ 2 files changed, 217 insertions(+) create mode 100644 test/e2e/upgrades/cassandra.go diff --git a/test/e2e/upgrades/BUILD b/test/e2e/upgrades/BUILD index fff84466aea..fafed1c0b1e 100644 --- a/test/e2e/upgrades/BUILD +++ b/test/e2e/upgrades/BUILD @@ -11,6 +11,7 @@ go_library( name = "go_default_library", srcs = [ "apparmor.go", + "cassandra.go", "configmaps.go", "etcd.go", "horizontal_pod_autoscalers.go", diff --git a/test/e2e/upgrades/cassandra.go b/test/e2e/upgrades/cassandra.go new file mode 100644 index 00000000000..b96a6d3eb76 --- /dev/null +++ b/test/e2e/upgrades/cassandra.go @@ -0,0 +1,216 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package upgrades + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "path/filepath" + "sync" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/kubernetes/pkg/util/version" + "k8s.io/kubernetes/test/e2e/framework" +) + +const cassandraManifestPath = "test/e2e/testing-manifests/statefulset/cassandra" + +// CassandraUpgradeTest ups and verifies that a Cassandra StatefulSet behaves +// well across upgrades. +type CassandraUpgradeTest struct { + ip string + successfulWrites int + ssTester *framework.StatefulSetTester +} + +// Name returns the tracking name of the test. +func (CassandraUpgradeTest) Name() string { return "cassandra-upgrade" } + +// Skip returns true when this test can be skipped. +func (CassandraUpgradeTest) Skip(upgCtx UpgradeContext) bool { + minVersion := version.MustParseSemantic("1.6.0") + for _, vCtx := range upgCtx.Versions { + if vCtx.Version.LessThan(minVersion) { + return true + } + } + return false +} + +func cassandraKubectlCreate(ns, file string) { + path := filepath.Join(framework.TestContext.RepoRoot, cassandraManifestPath, file) + framework.RunKubectlOrDie("create", "-f", path, fmt.Sprintf("--namespace=%s", ns)) +} + +// Setup creates a Cassandra StatefulSet and a PDB. It also brings up a tester +// ReplicaSet and associated service and PDB to guarantee availability during +// the upgrade. +// It waits for the system to stabilize before adding two users to verify +// connectivity. +func (t *CassandraUpgradeTest) Setup(f *framework.Framework) { + ns := f.Namespace.Name + statefulsetPoll := 30 * time.Second + statefulsetTimeout := 10 * time.Minute + t.ssTester = framework.NewStatefulSetTester(f.ClientSet) + + By("Creating a PDB") + cassandraKubectlCreate(ns, "pdb.yaml") + + By("Creating a Cassandra StatefulSet") + t.ssTester.CreateStatefulSet(cassandraManifestPath, ns) + + By("Creating a cassandra-test-server deployment") + cassandraKubectlCreate(ns, "tester.yaml") + + By("Getting the ingress IPs from the services") + err := wait.PollImmediate(statefulsetPoll, statefulsetTimeout, func() (bool, error) { + if t.ip = t.getServiceIP(f, ns, "test-server"); t.ip == "" { + return false, nil + } + if _, err := t.listUsers(); err != nil { + framework.Logf("Service endpoint is up but isn't responding") + return false, nil + } + return true, nil + }) + Expect(err).NotTo(HaveOccurred()) + framework.Logf("Service endpoint is up") + + By("Adding 2 dummy users") + Expect(t.addUser("Alice")).NotTo(HaveOccurred()) + Expect(t.addUser("Bob")).NotTo(HaveOccurred()) + t.successfulWrites = 2 + + By("Verifying that the users exist") + users, err := t.listUsers() + Expect(err).NotTo(HaveOccurred()) + Expect(len(users)).To(Equal(2)) +} + +// listUsers gets a list of users from the db via the tester service. +func (t *CassandraUpgradeTest) listUsers() ([]string, error) { + r, err := http.Get(fmt.Sprintf("http://%s:8080/list", t.ip)) + if err != nil { + return nil, err + } + defer r.Body.Close() + if r.StatusCode != http.StatusOK { + b, err := ioutil.ReadAll(r.Body) + if err != nil { + return nil, err + } + return nil, fmt.Errorf(string(b)) + } + var names []string + if err := json.NewDecoder(r.Body).Decode(&names); err != nil { + return nil, err + } + return names, nil +} + +// addUser adds a user to the db via the tester services. +func (t *CassandraUpgradeTest) addUser(name string) error { + val := map[string][]string{"name": {name}} + r, err := http.PostForm(fmt.Sprintf("http://%s:8080/add", t.ip), val) + if err != nil { + return err + } + defer r.Body.Close() + if r.StatusCode != http.StatusOK { + b, err := ioutil.ReadAll(r.Body) + if err != nil { + return err + } + return fmt.Errorf(string(b)) + } + return nil +} + +// getServiceIP is a helper method to extract the Ingress IP from the service. +func (t *CassandraUpgradeTest) getServiceIP(f *framework.Framework, ns, svcName string) string { + svc, err := f.ClientSet.CoreV1().Services(ns).Get(svcName, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + ingress := svc.Status.LoadBalancer.Ingress + if len(ingress) == 0 { + return "" + } + return ingress[0].IP +} + +// Test is called during the upgrade. +// It launches two goroutines, one continuously writes to the db and one reads +// from the db. Each attempt is tallied and at the end we verify if the success +// ratio is over a certain threshold (0.75). We also verify that we get +// at least the same number of rows back as we successfully wrote. +func (t *CassandraUpgradeTest) Test(f *framework.Framework, done <-chan struct{}, upgrade UpgradeType) { + By("Continuously polling the database during upgrade.") + var ( + success, failures, writeAttempts, lastUserCount int + mu sync.Mutex + errors = map[string]int{} + ) + // Write loop. + go wait.Until(func() { + writeAttempts++ + if err := t.addUser(fmt.Sprintf("user-%d", writeAttempts)); err != nil { + framework.Logf("Unable to add user: %v", err) + mu.Lock() + errors[err.Error()]++ + mu.Unlock() + return + } + t.successfulWrites++ + }, 10*time.Millisecond, done) + // Read loop. + wait.Until(func() { + users, err := t.listUsers() + if err != nil { + framework.Logf("Could not retrieve users: %v", err) + failures++ + mu.Lock() + errors[err.Error()]++ + mu.Unlock() + return + } + success++ + lastUserCount = len(users) + }, 10*time.Millisecond, done) + framework.Logf("got %d users; want >=%d", lastUserCount, t.successfulWrites) + + Expect(lastUserCount >= t.successfulWrites).To(BeTrue()) + ratio := float64(success) / float64(success+failures) + framework.Logf("Successful gets %d/%d=%v", success, success+failures, ratio) + ratio = float64(t.successfulWrites) / float64(writeAttempts) + framework.Logf("Successful writes %d/%d=%v", t.successfulWrites, writeAttempts, ratio) + framework.Logf("Errors: %v", errors) + // TODO(maisem): tweak this value once we have a few test runs. + Expect(ratio > 0.75).To(BeTrue()) +} + +// Teardown does one final check of the data's availability. +func (t *CassandraUpgradeTest) Teardown(f *framework.Framework) { + users, err := t.listUsers() + Expect(err).NotTo(HaveOccurred()) + Expect(len(users) >= t.successfulWrites).To(BeTrue()) +}