diff --git a/test/e2e/testing-manifests/statefulset/etcd/pdb.yaml b/test/e2e/testing-manifests/statefulset/etcd/pdb.yaml new file mode 100644 index 00000000000..64d68aac7c0 --- /dev/null +++ b/test/e2e/testing-manifests/statefulset/etcd/pdb.yaml @@ -0,0 +1,11 @@ +apiVersion: policy/v1beta1 +kind: PodDisruptionBudget +metadata: + name: etcd-pdb + labels: + pdb: etcd +spec: + minAvailable: 2 + selector: + matchLabels: + app: etcd diff --git a/test/e2e/testing-manifests/statefulset/etcd/service.yaml b/test/e2e/testing-manifests/statefulset/etcd/service.yaml new file mode 100644 index 00000000000..3932f04f42b --- /dev/null +++ b/test/e2e/testing-manifests/statefulset/etcd/service.yaml @@ -0,0 +1,18 @@ +apiVersion: v1 +kind: Service +metadata: + annotations: + service.alpha.kubernetes.io/tolerate-unready-endpoints: "true" +metadata: + name: etcd + labels: + app: etcd +spec: + ports: + - port: 2380 + name: etcd-server + - port: 2379 + name: etcd-client + clusterIP: None + selector: + app: etcd diff --git a/test/e2e/testing-manifests/statefulset/etcd/statefulset.yaml b/test/e2e/testing-manifests/statefulset/etcd/statefulset.yaml new file mode 100644 index 00000000000..c7800cd4e8f --- /dev/null +++ b/test/e2e/testing-manifests/statefulset/etcd/statefulset.yaml @@ -0,0 +1,178 @@ +apiVersion: apps/v1beta1 +kind: StatefulSet +metadata: + name: etcd + labels: + app: etcd +spec: + serviceName: etcd + replicas: 3 + template: + metadata: + name: etcd + labels: + app: etcd + spec: + containers: + - name: etcd + image: gcr.io/google_containers/etcd-amd64:2.2.5 + imagePullPolicy: Always + ports: + - containerPort: 2380 + name: peer + - containerPort: 2379 + name: client + resources: + requests: + cpu: 100m + memory: 512Mi + env: + - name: INITIAL_CLUSTER_SIZE + value: "3" + - name: SET_NAME + value: etcd + volumeMounts: + - name: datadir + mountPath: /var/run/etcd + lifecycle: + preStop: + exec: + command: + - "/bin/sh" + - "-ec" + - | + EPS="" + for i in $(seq 0 $((${INITIAL_CLUSTER_SIZE} - 1))); do + EPS="${EPS}${EPS:+,}http://${SET_NAME}-${i}.${SET_NAME}:2379" + done + + HOSTNAME=$(hostname) + + member_hash() { + etcdctl member list | grep http://${HOSTNAME}.${SET_NAME}:2380 | cut -d':' -f1 | cut -d'[' -f1 + } + + echo "Removing ${HOSTNAME} from etcd cluster" + + ETCDCTL_ENDPOINT=${EPS} etcdctl member remove $(member_hash) + if [ $? -eq 0 ]; then + # Remove everything otherwise the cluster will no longer scale-up + rm -rf /var/run/etcd/* + fi + command: + - "/bin/sh" + - "-ec" + - | + HOSTNAME=$(hostname) + + # store member id into PVC for later member replacement + collect_member() { + while ! etcdctl member list &>/dev/null; do sleep 1; done + etcdctl member list | grep http://${HOSTNAME}.${SET_NAME}:2380 | cut -d':' -f1 | cut -d'[' -f1 > /var/run/etcd/member_id + exit 0 + } + + eps() { + EPS="" + for i in $(seq 0 $((${INITIAL_CLUSTER_SIZE} - 1))); do + EPS="${EPS}${EPS:+,}http://${SET_NAME}-${i}.${SET_NAME}:2379" + done + echo ${EPS} + } + + member_hash() { + etcdctl member list | grep http://${HOSTNAME}.${SET_NAME}:2380 | cut -d':' -f1 | cut -d'[' -f1 + } + + # re-joining after failure? + if [ -e /var/run/etcd/default.etcd ]; then + echo "Re-joining etcd member" + member_id=$(cat /var/run/etcd/member_id) + + # re-join member + ETCDCTL_ENDPOINT=$(eps) etcdctl member update ${member_id} http://${HOSTNAME}.${SET_NAME}:2380 + exec etcd --name ${HOSTNAME} \ + --listen-peer-urls http://${HOSTNAME}.${SET_NAME}:2380 \ + --listen-client-urls http://${HOSTNAME}.${SET_NAME}:2379,http://127.0.0.1:2379 \ + --advertise-client-urls http://${HOSTNAME}.${SET_NAME}:2379 \ + --data-dir /var/run/etcd/default.etcd + fi + + # etcd-SET_ID + SET_ID=${HOSTNAME:5:${#HOSTNAME}} + + # adding a new member to existing cluster (assuming all initial pods are available) + if [ "${SET_ID}" -ge ${INITIAL_CLUSTER_SIZE} ]; then + export ETCDCTL_ENDPOINT=$(eps) + + # member already added? + MEMBER_HASH=$(member_hash) + if [ -n "${MEMBER_HASH}" ]; then + # the member hash exists but for some reason etcd failed + # as the datadir has not be created, we can remove the member + # and retrieve new hash + etcdctl member remove ${MEMBER_HASH} + fi + + echo "Adding new member" + etcdctl member add ${HOSTNAME} http://${HOSTNAME}.${SET_NAME}:2380 | grep "^ETCD_" > /var/run/etcd/new_member_envs + + if [ $? -ne 0 ]; then + echo "Exiting" + rm -f /var/run/etcd/new_member_envs + exit 1 + fi + + cat /var/run/etcd/new_member_envs + source /var/run/etcd/new_member_envs + + collect_member & + + exec etcd --name ${HOSTNAME} \ + --listen-peer-urls http://${HOSTNAME}.${SET_NAME}:2380 \ + --listen-client-urls http://${HOSTNAME}.${SET_NAME}:2379,http://127.0.0.1:2379 \ + --advertise-client-urls http://${HOSTNAME}.${SET_NAME}:2379 \ + --data-dir /var/run/etcd/default.etcd \ + --initial-advertise-peer-urls http://${HOSTNAME}.${SET_NAME}:2380 \ + --initial-cluster ${ETCD_INITIAL_CLUSTER} \ + --initial-cluster-state ${ETCD_INITIAL_CLUSTER_STATE} + fi + + for i in $(seq 0 $((${INITIAL_CLUSTER_SIZE} - 1))); do + while true; do + echo "Waiting for ${SET_NAME}-${i}.${SET_NAME} to come up" + ping -W 1 -c 1 ${SET_NAME}-${i}.${SET_NAME} > /dev/null && break + sleep 1s + done + done + + PEERS="" + for i in $(seq 0 $((${INITIAL_CLUSTER_SIZE} - 1))); do + PEERS="${PEERS}${PEERS:+,}${SET_NAME}-${i}=http://${SET_NAME}-${i}.${SET_NAME}:2380" + done + + collect_member & + + # join member + exec etcd --name ${HOSTNAME} \ + --initial-advertise-peer-urls http://${HOSTNAME}.${SET_NAME}:2380 \ + --listen-peer-urls http://${HOSTNAME}.${SET_NAME}:2380 \ + --listen-client-urls http://${HOSTNAME}.${SET_NAME}:2379,http://127.0.0.1:2379 \ + --advertise-client-urls http://${HOSTNAME}.${SET_NAME}:2379 \ + --initial-cluster-token etcd-cluster-1 \ + --initial-cluster ${PEERS} \ + --initial-cluster-state new \ + --data-dir /var/run/etcd/default.etcd + volumeClaimTemplates: + - metadata: + name: datadir + annotations: + volume.alpha.kubernetes.io/storage-class: anything + spec: + accessModes: + - "ReadWriteOnce" + resources: + requests: + # upstream recommended max is 700M + storage: 1Gi + diff --git a/test/e2e/testing-manifests/statefulset/etcd/tester.yaml b/test/e2e/testing-manifests/statefulset/etcd/tester.yaml new file mode 100644 index 00000000000..c5ea0b90c14 --- /dev/null +++ b/test/e2e/testing-manifests/statefulset/etcd/tester.yaml @@ -0,0 +1,24 @@ +apiVersion: apps/v1beta1 +kind: Deployment +metadata: + name: etcd-test-server +spec: + replicas: 3 + template: + metadata: + labels: + app: test-server + spec: + containers: + - name: test-server + image: gcr.io/google-containers/etcd-statefulset-e2e-test:0.0 + imagePullPolicy: Always + ports: + - containerPort: 8080 + readinessProbe: + httpGet: + path: /healthz + port: 8080 + initialDelaySeconds: 2 + periodSeconds: 2 + diff --git a/test/e2e/upgrades/BUILD b/test/e2e/upgrades/BUILD index 1b0cc573751..98082c71864 100644 --- a/test/e2e/upgrades/BUILD +++ b/test/e2e/upgrades/BUILD @@ -14,6 +14,7 @@ go_library( "configmaps.go", "daemonsets.go", "deployments.go", + "etcd.go", "horizontal_pod_autoscalers.go", "ingress.go", "job.go", diff --git a/test/e2e/upgrades/etcd.go b/test/e2e/upgrades/etcd.go new file mode 100644 index 00000000000..aea9e08ec64 --- /dev/null +++ b/test/e2e/upgrades/etcd.go @@ -0,0 +1,199 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package upgrades + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "path/filepath" + "sync" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/kubernetes/pkg/util/version" + "k8s.io/kubernetes/test/e2e/framework" +) + +const manifestPath = "test/e2e/testing-manifests/statefulset/etcd" + +type EtcdUpgradeTest struct { + ip string + successfulWrites int + ssTester *framework.StatefulSetTester +} + +func (EtcdUpgradeTest) Name() string { return "etcd-upgrade" } + +func (EtcdUpgradeTest) Skip(upgCtx UpgradeContext) bool { + minVersion := version.MustParseSemantic("1.6.0") + for _, vCtx := range upgCtx.Versions { + if vCtx.Version.LessThan(minVersion) { + return true + } + } + return false +} + +func kubectlCreate(ns, file string) { + path := filepath.Join(framework.TestContext.RepoRoot, manifestPath, file) + framework.RunKubectlOrDie("create", "-f", path, fmt.Sprintf("--namespace=%s", ns)) +} + +func (t *EtcdUpgradeTest) Setup(f *framework.Framework) { + ns := f.Namespace.Name + statefulsetPoll := 30 * time.Second + statefulsetTimeout := 10 * time.Minute + t.ssTester = framework.NewStatefulSetTester(f.ClientSet) + + By("Creating a PDB") + kubectlCreate(ns, "pdb.yaml") + + By("Creating an etcd StatefulSet") + t.ssTester.CreateStatefulSet(manifestPath, ns) + + By("Creating an etcd--test-server deployment") + kubectlCreate(ns, "tester.yaml") + + By("Getting the ingress IPs from the services") + err := wait.PollImmediate(statefulsetPoll, statefulsetTimeout, func() (bool, error) { + if t.ip = t.getServiceIP(f, ns, "test-server"); t.ip == "" { + return false, nil + } + if _, err := t.listUsers(); err != nil { + framework.Logf("Service endpoint is up but isn't responding") + return false, nil + } + return true, nil + }) + Expect(err).NotTo(HaveOccurred()) + framework.Logf("Service endpoint is up") + + By("Adding 2 dummy users") + Expect(t.addUser("Alice")).NotTo(HaveOccurred()) + Expect(t.addUser("Bob")).NotTo(HaveOccurred()) + t.successfulWrites = 2 + + By("Verifying that the users exist") + users, err := t.listUsers() + Expect(err).NotTo(HaveOccurred()) + Expect(len(users)).To(Equal(2)) +} + +func (t *EtcdUpgradeTest) listUsers() ([]string, error) { + r, err := http.Get(fmt.Sprintf("http://%s:8080/list", t.ip)) + if err != nil { + return nil, err + } + defer r.Body.Close() + if r.StatusCode != http.StatusOK { + b, err := ioutil.ReadAll(r.Body) + if err != nil { + return nil, err + } + return nil, fmt.Errorf(string(b)) + } + var names []string + if err := json.NewDecoder(r.Body).Decode(&names); err != nil { + return nil, err + } + return names, nil +} + +func (t *EtcdUpgradeTest) addUser(name string) error { + val := map[string][]string{"name": {name}} + r, err := http.PostForm(fmt.Sprintf("http://%s:8080/add", t.ip), val) + if err != nil { + return err + } + defer r.Body.Close() + if r.StatusCode != http.StatusOK { + b, err := ioutil.ReadAll(r.Body) + if err != nil { + return err + } + return fmt.Errorf(string(b)) + } + return nil +} + +func (t *EtcdUpgradeTest) getServiceIP(f *framework.Framework, ns, svcName string) string { + svc, err := f.ClientSet.CoreV1().Services(ns).Get(svcName, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + ingress := svc.Status.LoadBalancer.Ingress + if len(ingress) == 0 { + return "" + } + return ingress[0].IP +} + +func (t *EtcdUpgradeTest) Test(f *framework.Framework, done <-chan struct{}, upgrade UpgradeType) { + By("Continuously polling the database during upgrade.") + var ( + success, failures, writeAttempts, lastUserCount int + mu sync.Mutex + errors = map[string]int{} + ) + // Write loop. + go wait.Until(func() { + writeAttempts++ + if err := t.addUser(fmt.Sprintf("user-%d", writeAttempts)); err != nil { + framework.Logf("Unable to add user: %v", err) + mu.Lock() + errors[err.Error()]++ + mu.Unlock() + return + } + t.successfulWrites++ + }, 10*time.Millisecond, done) + // Read loop. + wait.Until(func() { + users, err := t.listUsers() + if err != nil { + framework.Logf("Could not retrieve users: %v", err) + failures++ + mu.Lock() + errors[err.Error()]++ + mu.Unlock() + return + } + success++ + lastUserCount = len(users) + }, 10*time.Millisecond, done) + framework.Logf("got %d users; want >=%d", lastUserCount, t.successfulWrites) + + Expect(lastUserCount >= t.successfulWrites).To(BeTrue()) + ratio := float64(success) / float64(success+failures) + framework.Logf("Successful gets %d/%d=%v", success, success+failures, ratio) + ratio = float64(t.successfulWrites) / float64(writeAttempts) + framework.Logf("Successful writes %d/%d=%v", t.successfulWrites, writeAttempts, ratio) + framework.Logf("Errors: %v", errors) + // TODO(maisem): tweak this value once we have a few test runs. + Expect(ratio > 0.75).To(BeTrue()) +} + +// Teardown does one final check of the data's availability. +func (t *EtcdUpgradeTest) Teardown(f *framework.Framework) { + users, err := t.listUsers() + Expect(err).NotTo(HaveOccurred()) + Expect(len(users) >= t.successfulWrites).To(BeTrue()) +}