1
0
mirror of https://github.com/rancher/rke.git synced 2025-09-25 14:48:06 +00:00

unit tests and go lint fixes

Add k8s operation test
This commit is contained in:
galal-hussein
2017-11-07 17:44:17 +02:00
parent 66fd6c4eba
commit a3e0d7203b
11 changed files with 315 additions and 39 deletions

View File

@@ -5,7 +5,7 @@ ARG DAPPER_HOST_ARCH
ENV HOST_ARCH=${DAPPER_HOST_ARCH} ARCH=${DAPPER_HOST_ARCH}
RUN apt-get update && \
apt-get install -y gcc ca-certificates git wget curl vim less file && \
apt-get install -y gcc ca-certificates git wget curl vim less file kmod iptables && \
rm -f /bin/sh && ln -s /bin/bash /bin/sh
ENV GOLANG_ARCH_amd64=amd64 GOLANG_ARCH_arm=armv6l GOLANG_ARCH=GOLANG_ARCH_${ARCH} \
@@ -21,6 +21,7 @@ ENV DOCKER_URL_amd64=https://get.docker.com/builds/Linux/x86_64/docker-1.10.3 \
RUN wget -O - ${!DOCKER_URL} > /usr/bin/docker && chmod +x /usr/bin/docker
ENV DAPPER_SOURCE /go/src/github.com/rancher/rke/
ENV DAPPER_RUN_ARGS --privileged -v /var/lib/docker
ENV DAPPER_OUTPUT ./bin ./dist
ENV DAPPER_DOCKER_SOCKET true
ENV TRASH_CACHE ${DAPPER_SOURCE}/.trash-cache

View File

@@ -16,7 +16,7 @@ func SetUpAuthentication(kubeCluster, currentCluster *Cluster, authType string)
if authType == X509AuthenticationProvider {
var err error
if currentCluster != nil {
kubeCluster.Certificates, err = getClusterCerts(kubeCluster.KClient)
kubeCluster.Certificates, err = getClusterCerts(kubeCluster.KubeClient)
if err != nil {
return fmt.Errorf("Failed to Get Kubernetes certificates: %v", err)
}
@@ -34,7 +34,7 @@ func SetUpAuthentication(kubeCluster, currentCluster *Cluster, authType string)
return nil
}
func getClusterCerts(kClient *kubernetes.Clientset) (map[string]pki.CertificatePKI, error) {
func getClusterCerts(kubeClient *kubernetes.Clientset) (map[string]pki.CertificatePKI, error) {
logrus.Infof("[certificates] Getting Cluster certificates from Kubernetes")
certificatesNames := []string{
pki.CACertName,
@@ -47,7 +47,7 @@ func getClusterCerts(kClient *kubernetes.Clientset) (map[string]pki.CertificateP
}
certMap := make(map[string]pki.CertificatePKI)
for _, certName := range certificatesNames {
secret, err := k8s.GetSecret(kClient, certName)
secret, err := k8s.GetSecret(kubeClient, certName)
if err != nil {
return nil, err
}
@@ -64,10 +64,10 @@ func getClusterCerts(kClient *kubernetes.Clientset) (map[string]pki.CertificateP
return certMap, nil
}
func saveClusterCerts(kClient *kubernetes.Clientset, crts map[string]pki.CertificatePKI) error {
func saveClusterCerts(kubeClient *kubernetes.Clientset, crts map[string]pki.CertificatePKI) error {
logrus.Infof("[certificates] Save kubernetes certificates as secrets")
for crtName, crt := range crts {
err := saveCertToKubernetes(kClient, crtName, crt)
err := saveCertToKubernetes(kubeClient, crtName, crt)
if err != nil {
return fmt.Errorf("Failed to save certificate [%s] to kubernetes: %v", crtName, err)
}
@@ -76,23 +76,23 @@ func saveClusterCerts(kClient *kubernetes.Clientset, crts map[string]pki.Certifi
return nil
}
func saveCertToKubernetes(kClient *kubernetes.Clientset, crtName string, crt pki.CertificatePKI) error {
func saveCertToKubernetes(kubeClient *kubernetes.Clientset, crtName string, crt pki.CertificatePKI) error {
logrus.Debugf("[certificates] Saving certificate [%s] to kubernetes", crtName)
timeout := make(chan bool, 1)
go func() {
for {
err := k8s.UpdateSecret(kClient, "Certificate", cert.EncodeCertPEM(crt.Certificate), crtName)
err := k8s.UpdateSecret(kubeClient, "Certificate", cert.EncodeCertPEM(crt.Certificate), crtName)
if err != nil {
time.Sleep(time.Second * 5)
continue
}
err = k8s.UpdateSecret(kClient, "Key", cert.EncodePrivateKeyPEM(crt.Key), crtName)
err = k8s.UpdateSecret(kubeClient, "Key", cert.EncodePrivateKeyPEM(crt.Key), crtName)
if err != nil {
time.Sleep(time.Second * 5)
continue
}
if len(crt.Config) > 0 {
err = k8s.UpdateSecret(kClient, "Config", []byte(crt.Config), crtName)
err = k8s.UpdateSecret(kubeClient, "Config", []byte(crt.Config), crtName)
if err != nil {
time.Sleep(time.Second * 5)
continue
@@ -108,5 +108,4 @@ func saveCertToKubernetes(kClient *kubernetes.Clientset, crtName string, crt pki
case <-time.After(time.Second * KubernetesClientTimeOut):
return fmt.Errorf("[certificates] Timeout waiting for kubernetes to be ready")
}
return nil
}

View File

@@ -18,7 +18,7 @@ type Cluster struct {
EtcdHosts []hosts.Host
WorkerHosts []hosts.Host
ControlPlaneHosts []hosts.Host
KClient *kubernetes.Clientset
KubeClient *kubernetes.Clientset
KubernetesServiceIP net.IP
Certificates map[string]pki.CertificatePKI
ClusterDomain string
@@ -60,7 +60,7 @@ func ParseConfig(clusterFile string) (*Cluster, error) {
if err != nil {
return nil, fmt.Errorf("Failed to classify hosts from config file: %v", err)
}
c.KubernetesServiceIP, err = services.GetKubernetesServiceIp(c.Services.KubeAPI.ServiceClusterIPRange)
c.KubernetesServiceIP, err = services.GetKubernetesServiceIP(c.Services.KubeAPI.ServiceClusterIPRange)
if err != nil {
return nil, fmt.Errorf("Failed to get Kubernetes Service IP: %v", err)
}

View File

@@ -16,15 +16,15 @@ import (
func (c *Cluster) SaveClusterState(clusterFile string) error {
// Reinitialize kubernetes Client
var err error
c.KClient, err = k8s.NewClient(pki.KubeAdminConfigPath)
c.KubeClient, err = k8s.NewClient(pki.KubeAdminConfigPath)
if err != nil {
return fmt.Errorf("Failed to re-initialize Kubernetes Client: %v", err)
}
err = saveClusterCerts(c.KClient, c.Certificates)
err = saveClusterCerts(c.KubeClient, c.Certificates)
if err != nil {
return fmt.Errorf("[certificates] Failed to Save Kubernetes certificates: %v", err)
}
err = saveStateToKubernetes(c.KClient, pki.KubeAdminConfigPath, []byte(clusterFile))
err = saveStateToKubernetes(c.KubeClient, pki.KubeAdminConfigPath, []byte(clusterFile))
if err != nil {
return fmt.Errorf("[state] Failed to save configuration state: %v", err)
}
@@ -34,18 +34,18 @@ func (c *Cluster) SaveClusterState(clusterFile string) error {
func (c *Cluster) GetClusterState() (*Cluster, error) {
var err error
var currentCluster *Cluster
c.KClient, err = k8s.NewClient(pki.KubeAdminConfigPath)
c.KubeClient, err = k8s.NewClient(pki.KubeAdminConfigPath)
if err != nil {
logrus.Warnf("Failed to initiate new Kubernetes Client: %v", err)
} else {
// Handle pervious kubernetes state and certificate generation
currentCluster = getStateFromKubernetes(c.KClient, pki.KubeAdminConfigPath)
currentCluster = getStateFromKubernetes(c.KubeClient, pki.KubeAdminConfigPath)
if currentCluster != nil {
err = currentCluster.InvertIndexHosts()
if err != nil {
return nil, fmt.Errorf("Failed to classify hosts from fetched cluster: %v", err)
}
err = hosts.ReconcileWorkers(currentCluster.WorkerHosts, c.WorkerHosts, c.KClient)
err = hosts.ReconcileWorkers(currentCluster.WorkerHosts, c.WorkerHosts, c.KubeClient)
if err != nil {
return nil, fmt.Errorf("Failed to reconcile hosts: %v", err)
}
@@ -54,12 +54,12 @@ func (c *Cluster) GetClusterState() (*Cluster, error) {
return currentCluster, nil
}
func saveStateToKubernetes(kClient *kubernetes.Clientset, kubeConfigPath string, clusterFile []byte) error {
func saveStateToKubernetes(kubeClient *kubernetes.Clientset, kubeConfigPath string, clusterFile []byte) error {
logrus.Infof("[state] Saving cluster state to Kubernetes")
timeout := make(chan bool, 1)
go func() {
for {
err := k8s.UpdateConfigMap(kClient, clusterFile, StateConfigMapName)
err := k8s.UpdateConfigMap(kubeClient, clusterFile, StateConfigMapName)
if err != nil {
time.Sleep(time.Second * 5)
continue
@@ -77,7 +77,7 @@ func saveStateToKubernetes(kClient *kubernetes.Clientset, kubeConfigPath string,
}
}
func getStateFromKubernetes(kClient *kubernetes.Clientset, kubeConfigPath string) *Cluster {
func getStateFromKubernetes(kubeClient *kubernetes.Clientset, kubeConfigPath string) *Cluster {
logrus.Infof("[state] Fetching cluster state from Kubernetes")
var cfgMap *v1.ConfigMap
var currentCluster Cluster
@@ -85,7 +85,7 @@ func getStateFromKubernetes(kClient *kubernetes.Clientset, kubeConfigPath string
timeout := make(chan bool, 1)
go func() {
for {
cfgMap, err = k8s.GetConfigMap(kClient, StateConfigMapName)
cfgMap, err = k8s.GetConfigMap(kubeClient, StateConfigMapName)
if err != nil {
time.Sleep(time.Second * 5)
continue
@@ -107,5 +107,4 @@ func getStateFromKubernetes(kClient *kubernetes.Clientset, kubeConfigPath string
logrus.Warnf("Timed out waiting for kubernetes cluster")
return nil
}
return nil
}

View File

@@ -46,46 +46,46 @@ func ClusterCommand() cli.Command {
func ClusterUp(clusterFile, authType string) (string, string, string, string, error) {
logrus.Infof("Building Kubernetes cluster")
var ApiURL, caCrt, clientCert, clientKey string
var APIURL, caCrt, clientCert, clientKey string
kubeCluster, err := cluster.ParseConfig(clusterFile)
if err != nil {
return ApiURL, caCrt, clientCert, clientKey, err
return APIURL, caCrt, clientCert, clientKey, err
}
err = kubeCluster.TunnelHosts()
if err != nil {
return ApiURL, caCrt, clientCert, clientKey, err
return APIURL, caCrt, clientCert, clientKey, err
}
currentCluster, err := kubeCluster.GetClusterState()
if err != nil {
return ApiURL, caCrt, clientCert, clientKey, err
return APIURL, caCrt, clientCert, clientKey, err
}
err = cluster.SetUpAuthentication(kubeCluster, currentCluster, authType)
if err != nil {
return ApiURL, caCrt, clientCert, clientKey, err
return APIURL, caCrt, clientCert, clientKey, err
}
err = kubeCluster.SetUpHosts(authType)
if err != nil {
return ApiURL, caCrt, clientCert, clientKey, err
return APIURL, caCrt, clientCert, clientKey, err
}
err = kubeCluster.DeployClusterPlanes()
if err != nil {
return ApiURL, caCrt, clientCert, clientKey, err
return APIURL, caCrt, clientCert, clientKey, err
}
err = kubeCluster.SaveClusterState(clusterFile)
if err != nil {
return ApiURL, caCrt, clientCert, clientKey, err
return APIURL, caCrt, clientCert, clientKey, err
}
ApiURL = fmt.Sprintf("https://" + kubeCluster.ControlPlaneHosts[0].IP + ":6443")
APIURL = fmt.Sprintf("https://" + kubeCluster.ControlPlaneHosts[0].IP + ":6443")
caCrt = string(cert.EncodeCertPEM(kubeCluster.Certificates[pki.CACertName].Certificate))
clientCert = string(cert.EncodeCertPEM(kubeCluster.Certificates[pki.KubeAdminCommonName].Certificate))
clientKey = string(cert.EncodePrivateKeyPEM(kubeCluster.Certificates[pki.KubeAdminCommonName].Key))
return ApiURL, caCrt, clientCert, clientKey, nil
return APIURL, caCrt, clientCert, clientKey, nil
}
func clusterUpFromCli(ctx *cli.Context) error {

View File

@@ -17,7 +17,7 @@ type Host struct {
DClient *client.Client
}
func ReconcileWorkers(currentWorkers []Host, newWorkers []Host, kClient *kubernetes.Clientset) error {
func ReconcileWorkers(currentWorkers []Host, newWorkers []Host, kubeClient *kubernetes.Clientset) error {
for _, currentWorker := range currentWorkers {
found := false
for _, newWorker := range newWorkers {
@@ -26,7 +26,7 @@ func ReconcileWorkers(currentWorkers []Host, newWorkers []Host, kClient *kuberne
}
}
if !found {
if err := deleteWorkerNode(&currentWorker, kClient); err != nil {
if err := deleteWorkerNode(&currentWorker, kubeClient); err != nil {
return err
}
}
@@ -34,9 +34,9 @@ func ReconcileWorkers(currentWorkers []Host, newWorkers []Host, kClient *kuberne
return nil
}
func deleteWorkerNode(workerNode *Host, kClient *kubernetes.Clientset) error {
func deleteWorkerNode(workerNode *Host, kubeClient *kubernetes.Clientset) error {
logrus.Infof("[hosts] Deleting host [%s] from the cluster", workerNode.Hostname)
err := k8s.DeleteNode(kClient, workerNode.Hostname)
err := k8s.DeleteNode(kubeClient, workerNode.Hostname)
if err != nil {
return err
}

86
k8s/k8s_test.go Normal file
View File

@@ -0,0 +1,86 @@
package k8s
import (
"testing"
check "gopkg.in/check.v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/api/v1"
)
const (
KubeConfigPath = "/etc/kubernetes/ssl/kubeconfig"
ConfigMapName = "testconfigmap"
ConfigYaml = `---
foo: bar
test: test123`
SecretFile = `secret123`
SecretName = "secret"
)
type KubernetesOperationsTestSuite struct {
kubeClient *kubernetes.Clientset
}
func Test(t *testing.T) {
check.TestingT(t)
}
var _ = check.Suite(&KubernetesOperationsTestSuite{})
func (k *KubernetesOperationsTestSuite) SetUpSuite(c *check.C) {
var err error
k.kubeClient, err = NewClient(KubeConfigPath)
meta := metav1.ObjectMeta{Name: metav1.NamespaceSystem}
ns := &v1.Namespace{
ObjectMeta: meta,
}
if _, err = k.kubeClient.CoreV1().Namespaces().Create(ns); err != nil {
c.Fatalf("Failed to set up test suite: %v", err)
}
}
func (k *KubernetesOperationsTestSuite) TestSaveConfig(c *check.C) {
var err error
if err != nil {
c.Fatalf("Failed to initialize kubernetes client")
}
// Make sure that config yaml file can be stored as a config map
err = UpdateConfigMap(k.kubeClient, []byte(ConfigYaml), ConfigMapName)
if err != nil {
c.Fatalf("Failed to store config map %s: %v", ConfigMapName, err)
}
cfgMap, err := GetConfigMap(k.kubeClient, ConfigMapName)
if err != nil {
c.Fatalf("Failed to fetch config map %s: %v", ConfigMapName, err)
}
if cfgMap.Data[ConfigMapName] != ConfigYaml {
c.Fatalf("Failed to verify the config map %s: %v", ConfigMapName, err)
}
}
func (k *KubernetesOperationsTestSuite) TestSaveSecret(c *check.C) {
var err error
if err != nil {
c.Fatalf("Failed to initialize kubernetes client")
}
err = UpdateSecret(k.kubeClient, SecretName, []byte(SecretFile), SecretName)
if err != nil {
c.Fatalf("Failed to store secret %s: %v", SecretName, err)
}
secret, err := GetSecret(k.kubeClient, SecretName)
if err != nil {
c.Fatalf("Failed to fetch secret %s: %v", SecretName, err)
}
if string(secret.Data[SecretName]) != SecretFile {
c.Fatalf("Failed to verify the secret %s: %v", SecretName, err)
}
}

108
pki/pki_test.go Normal file
View File

@@ -0,0 +1,108 @@
package pki
import (
"crypto/x509"
"fmt"
"net"
"testing"
"github.com/rancher/rke/hosts"
)
const (
FakeClusterDomain = "cluster.test"
FakeKubernetesServiceIP = "10.0.0.1"
)
func TestPKI(t *testing.T) {
cpHosts := []hosts.Host{
hosts.Host{
IP: "1.1.1.1",
AdvertiseAddress: "192.168.1.5",
Role: []string{"controlplane"},
Hostname: "server1",
},
}
certificateMap, err := StartCertificatesGeneration(cpHosts, cpHosts, FakeClusterDomain, net.ParseIP(FakeKubernetesServiceIP))
if err != nil {
t.Fatalf("Failed To generate certificates: %v", err)
}
assertEqual(t, certificateMap[CACertName].Certificate.IsCA, true, "")
roots := x509.NewCertPool()
roots.AddCert(certificateMap[CACertName].Certificate)
certificatesToVerify := []string{
KubeAPICertName,
KubeNodeName,
KubeProxyName,
KubeControllerName,
KubeSchedulerName,
KubeAdminCommonName,
}
opts := x509.VerifyOptions{
Roots: roots,
KeyUsages: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth},
}
for _, cert := range certificatesToVerify {
if _, err := certificateMap[cert].Certificate.Verify(opts); err != nil {
t.Fatalf("Failed to verify certificate %s: %v", cert, err)
}
}
// Test DNS ALT names
kubeAPIDNSNames := []string{
"localhost",
"kubernetes",
"kubernetes.default",
"kubernetes.default.svc",
"kubernetes.default.svc." + FakeClusterDomain,
}
for _, testDNS := range kubeAPIDNSNames {
assertEqual(
t,
stringInSlice(
testDNS,
certificateMap[KubeAPICertName].Certificate.DNSNames),
true,
fmt.Sprintf("DNS %s is not found in ALT names of Kube API certificate", testDNS))
}
// Test ALT IPs
kubeAPIAltIPs := []net.IP{
net.ParseIP("127.0.0.1"),
net.ParseIP(cpHosts[0].AdvertiseAddress),
net.ParseIP(cpHosts[0].IP),
net.ParseIP(FakeKubernetesServiceIP),
}
for _, testIP := range kubeAPIAltIPs {
found := false
for _, altIP := range certificateMap[KubeAPICertName].Certificate.IPAddresses {
if testIP.Equal(altIP) {
found = true
break
}
}
if !found {
t.Fatalf("IP Address %v is not found in ALT Ips of kube API", testIP)
}
}
}
func stringInSlice(a string, list []string) bool {
for _, b := range list {
if b == a {
return true
}
}
return false
}
func assertEqual(t *testing.T, a interface{}, b interface{}, message string) {
if a == b {
return
}
if len(message) == 0 {
message = fmt.Sprintf("%v != %v", a, b)
}
t.Fatal(message)
}

View File

@@ -1,10 +1,92 @@
#!/bin/bash
set -e
cleanup()
{
local exit=$?
kill $PID || true
wait $PID || true
exit $exit
}
cd $(dirname $0)/..
echo Running tests
export SOCK=/var/run/dind.sock
export DOCKER_HOST=unix://${SOCK}
export CATTLE_DOCKER_USE_BOOT2DOCKER=true
export INCLUSTER_CONFIG=false
ip link set dev eth0 mtu 1300
if grep overlay /proc/filesystems; then
STORAGE=overlay
else
STORAGE=aufs
fi
docker daemon -s $STORAGE -D -H $DOCKER_HOST >/tmp/docker.log 2>&1 &
PID=$!
trap cleanup EXIT
sleep 1
IDS=$(docker ps -qa)
if [ -n "$IDS" ]; then
docker kill $(docker ps -qa) || true
docker rm -fv $(docker ps -qa) || true
fi
export ARCH=amd64 K8S_VERSION=v1.2.4
docker run -d \
--volume=/:/rootfs:ro \
--volume=/sys:/sys:rw \
--volume=/var/lib/docker/:/var/lib/docker:rw \
--volume=/var/lib/kubelet/:/var/lib/kubelet:rw \
--volume=/var/run:/var/run:rw \
--volume=/var/run/dind.sock:/var/run/docker.sock:rw \
--net=host \
--pid=host \
--privileged \
gcr.io/google_containers/hyperkube-${ARCH}:${K8S_VERSION} \
/hyperkube kubelet \
--hostname-override=127.0.0.1 \
--api-servers=http://localhost:8080 \
--config=/etc/kubernetes/manifests \
--cluster-dns=10.0.0.10 \
--cluster-domain=cluster.local \
--allow-privileged --v=2
docker ps -a
echo "Waiting for kubernetes"
until $(curl --output /dev/null --silent --fail http://localhost:8080/api/v1/services); do
printf '.'
sleep 5
done
mkdir -p /etc/kubernetes/ssl
cat > /etc/kubernetes/ssl/kubeconfig << EOF
apiVersion: v1
kind: Config
clusters:
- cluster:
api-version: v1
insecure-skip-tls-verify: true
server: "http://localhost:8080"
name: "Default"
contexts:
- context:
cluster: "Default"
user: "Default"
name: "Default"
current-context: "Default"
users:
- name: "Default"
user:
token: "test"
EOF
PACKAGES=". $(find -name '*.go' | xargs -I{} dirname {} | cut -f2 -d/ | sort -u | grep -Ev '(^\.$|.git|.trash-cache|vendor|bin)' | sed -e 's!^!./!' -e 's!$!/...!')"
[ "${ARCH}" == "amd64" ] && RACE=-race

View File

@@ -18,7 +18,7 @@ const (
EtcdContainerName = "etcd"
)
func GetKubernetesServiceIp(serviceClusterRange string) (net.IP, error) {
func GetKubernetesServiceIP(serviceClusterRange string) (net.IP, error) {
ip, ipnet, err := net.ParseCIDR(serviceClusterRange)
if err != nil {
return nil, fmt.Errorf("Failed to get kubernetes service IP: %v", err)

View File

@@ -15,3 +15,4 @@ github.com/gogo/protobuf 117892bf1866fbaa2318c03e50e40564c8845457
github.com/opencontainers/image-spec 7c889fafd04a893f5c5f50b7ab9963d5d64e5242
github.com/pkg/errors f15c970de5b76fac0b59abb32d62c17cc7bed265
k8s.io/client-go v4.0.0 transitive=true
gopkg.in/check.v1 11d3bc7aa68e238947792f30573146a3231fc0f1