From 2f747f3d3c99c249131487c441c9085643c2309c Mon Sep 17 00:00:00 2001 From: Beata Skiba Date: Thu, 20 Jul 2017 15:05:25 +0200 Subject: [PATCH] Add a simple cloud provider for e2e tests on kubemark This is needed for cluster autoscaler e2e test to run on kubemark. We need the ability to add and remove nodes and operate on nodegroups. Kubemark does not provide this at the moment. --- hack/verify-flags/known-flags.txt | 1 + pkg/kubemark/BUILD | 8 + pkg/kubemark/controller.go | 343 +++++++++++++++++++++++++++++ test/e2e/framework/BUILD | 2 + test/e2e/framework/framework.go | 26 +++ test/e2e/framework/size.go | 6 + test/e2e/framework/test_context.go | 18 +- 7 files changed, 397 insertions(+), 7 deletions(-) create mode 100644 pkg/kubemark/controller.go diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index a3d243b78bd..d2ee5b68bb8 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -425,6 +425,7 @@ kube-reserved kube-reserved-cgroup kube-master-url kube-reserved +kubemark-external-kubeconfig kubernetes-anywhere-cluster kubernetes-anywhere-path kubernetes-anywhere-phase2-provider diff --git a/pkg/kubemark/BUILD b/pkg/kubemark/BUILD index 1831c1c8398..70835e5c55e 100644 --- a/pkg/kubemark/BUILD +++ b/pkg/kubemark/BUILD @@ -10,6 +10,7 @@ load( go_library( name = "go_default_library", srcs = [ + "controller.go", "hollow_kubelet.go", "hollow_proxy.go", ], @@ -22,6 +23,7 @@ go_library( "//pkg/apis/componentconfig:go_default_library", "//pkg/apis/componentconfig/v1alpha1:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", + "//pkg/controller:go_default_library", "//pkg/kubelet:go_default_library", "//pkg/kubelet/cadvisor:go_default_library", "//pkg/kubelet/cm:go_default_library", @@ -44,9 +46,15 @@ go_library( "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/fields:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", + "//vendor/k8s.io/client-go/informers:go_default_library", + "//vendor/k8s.io/client-go/informers/core/v1:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", + "//vendor/k8s.io/client-go/listers/core/v1:go_default_library", + "//vendor/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library", "//vendor/k8s.io/utils/exec:go_default_library", ], diff --git a/pkg/kubemark/controller.go b/pkg/kubemark/controller.go new file mode 100644 index 00000000000..53d163175cc --- /dev/null +++ b/pkg/kubemark/controller.go @@ -0,0 +1,343 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubemark + +import ( + "fmt" + "math/rand" + "sync" + "time" + + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/informers" + informersv1 "k8s.io/client-go/informers/core/v1" + kubeclient "k8s.io/client-go/kubernetes" + listersv1 "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/controller" + + "github.com/golang/glog" +) + +const ( + namespaceKubemark = "kubemark" + hollowNodeName = "hollow-node" + nodeGroupLabel = "autoscaling.k8s.io/nodegroup" + numRetries = 3 +) + +// KubemarkController is a simplified version of cloud provider for kubemark. It allows +// to add and delete nodes from a kubemark cluster and introduces nodegroups +// by applying labels to the kubemark's hollow-nodes. +type KubemarkController struct { + nodeTemplate *apiv1.ReplicationController + externalCluster externalCluster + kubemarkCluster kubemarkCluster + rand *rand.Rand +} + +// externalCluster is used to communicate with the external cluster that hosts +// kubemark, in order to be able to list, create and delete hollow nodes +// by manipulating the replication controllers. +type externalCluster struct { + rcLister listersv1.ReplicationControllerLister + rcSynced cache.InformerSynced + podLister listersv1.PodLister + podSynced cache.InformerSynced + client kubeclient.Interface +} + +// kubemarkCluster is used to delete nodes from kubemark cluster once their +// respective replication controllers have been deleted and the nodes have +// become unready. This is to cover for the fact that there is no proper cloud +// provider for kubemark that would care for deleting the nodes. +type kubemarkCluster struct { + client kubeclient.Interface + nodeLister listersv1.NodeLister + nodeSynced cache.InformerSynced + nodesToDelete map[string]bool + nodesToDeleteLock sync.Mutex +} + +// NewKubemarkController creates KubemarkController using the provided clients to talk to external +// and kubemark clusters. +func NewKubemarkController(externalClient kubeclient.Interface, externalInformerFactory informers.SharedInformerFactory, + kubemarkClient kubeclient.Interface, kubemarkNodeInformer informersv1.NodeInformer) (*KubemarkController, error) { + rcInformer := externalInformerFactory.InformerFor(&apiv1.ReplicationController{}, newReplicationControllerInformer) + podInformer := externalInformerFactory.InformerFor(&apiv1.Pod{}, newPodInformer) + controller := &KubemarkController{ + externalCluster: externalCluster{ + rcLister: listersv1.NewReplicationControllerLister(rcInformer.GetIndexer()), + rcSynced: rcInformer.HasSynced, + podLister: listersv1.NewPodLister(podInformer.GetIndexer()), + podSynced: podInformer.HasSynced, + client: externalClient, + }, + kubemarkCluster: kubemarkCluster{ + nodeLister: kubemarkNodeInformer.Lister(), + nodeSynced: kubemarkNodeInformer.Informer().HasSynced, + client: kubemarkClient, + nodesToDelete: make(map[string]bool), + nodesToDeleteLock: sync.Mutex{}, + }, + rand: rand.New(rand.NewSource(time.Now().UTC().UnixNano())), + } + + kubemarkNodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + UpdateFunc: controller.kubemarkCluster.removeUnneededNodes, + }) + + return controller, nil +} + +// Init waits for population of caches and populates the node template needed +// for creation of kubemark nodes. +func (kubemarkController *KubemarkController) Init(stopCh chan struct{}) { + if !controller.WaitForCacheSync("kubemark", stopCh, + kubemarkController.externalCluster.rcSynced, + kubemarkController.externalCluster.podSynced, + kubemarkController.kubemarkCluster.nodeSynced) { + return + } + + // Get hollow node template from an existing hollow node to be able to create + // new nodes based on it. + nodeTemplate, err := kubemarkController.getNodeTemplate() + if err != nil { + glog.Fatalf("Failed to get node template: %s", err) + } + kubemarkController.nodeTemplate = nodeTemplate +} + +// GetNodesForNodegroup returns list of the nodes in the node group. +func (kubemarkController *KubemarkController) GetNodeNamesForNodegroup(nodeGroup string) ([]string, error) { + selector := labels.SelectorFromSet(labels.Set{nodeGroupLabel: nodeGroup}) + pods, err := kubemarkController.externalCluster.podLister.List(selector) + if err != nil { + return nil, err + } + result := make([]string, 0, len(pods)) + for _, pod := range pods { + result = append(result, pod.ObjectMeta.Name) + } + return result, nil +} + +// GetNodeGroupSize returns the current size for the node group. +func (kubemarkController *KubemarkController) GetNodeGroupSize(nodeGroup string) (int, error) { + selector := labels.SelectorFromSet(labels.Set(map[string]string{nodeGroupLabel: nodeGroup})) + nodes, err := kubemarkController.externalCluster.rcLister.List(selector) + if err != nil { + return 0, err + } + return len(nodes), nil +} + +// SetNodeGroupSize changes the size of node group by adding or removing nodes. +func (kubemarkController *KubemarkController) SetNodeGroupSize(nodeGroup string, size int) error { + currSize, err := kubemarkController.GetNodeGroupSize(nodeGroup) + if err != nil { + return err + } + switch delta := size - currSize; { + case delta < 0: + absDelta := -delta + nodes, err := kubemarkController.GetNodeNamesForNodegroup(nodeGroup) + if err != nil { + return err + } + if len(nodes) > absDelta { + return fmt.Errorf("can't remove %d nodes from %s nodegroup, not enough nodes", absDelta, nodeGroup) + } + for i, node := range nodes { + if i == absDelta { + return nil + } + if err := kubemarkController.removeNodeFromNodeGroup(nodeGroup, node); err != nil { + return err + } + } + case delta > 0: + for i := 0; i < delta; i++ { + if err := kubemarkController.addNodeToNodeGroup(nodeGroup); err != nil { + return err + } + } + } + + return nil +} + +func (kubemarkController *KubemarkController) addNodeToNodeGroup(nodeGroup string) error { + templateCopy, err := api.Scheme.Copy(kubemarkController.nodeTemplate) + if err != nil { + return err + } + node := templateCopy.(*apiv1.ReplicationController) + node.Name = fmt.Sprintf("%s-%d", nodeGroup, kubemarkController.rand.Int63()) + node.Labels = map[string]string{nodeGroupLabel: nodeGroup, "name": node.Name} + node.Spec.Template.Labels = node.Labels + + for i := 0; i < numRetries; i++ { + _, err = kubemarkController.externalCluster.client.CoreV1().ReplicationControllers(node.Namespace).Create(node) + if err == nil { + return nil + } + } + + return err +} + +func (kubemarkController *KubemarkController) removeNodeFromNodeGroup(nodeGroup string, node string) error { + pods, err := kubemarkController.externalCluster.podLister.List(labels.Everything()) + if err != nil { + return err + } + for _, pod := range pods { + if pod.ObjectMeta.Name == node { + if pod.ObjectMeta.Labels[nodeGroupLabel] != nodeGroup { + return fmt.Errorf("can't delete node %s from nodegroup %s. Node is not in nodegroup", node, nodeGroup) + } + policy := metav1.DeletePropagationForeground + for i := 0; i < numRetries; i++ { + err := kubemarkController.externalCluster.client.CoreV1().ReplicationControllers(namespaceKubemark).Delete( + pod.ObjectMeta.Labels["name"], + &metav1.DeleteOptions{PropagationPolicy: &policy}) + if err == nil { + glog.Infof("marking node %s for deletion", node) + // Mark node for deletion from kubemark cluster. + // Once it becomes unready after replication controller + // deletion has been noticed, we will delete it explicitly. + // This is to cover for the fact that kubemark does not + // take care of this itself. + kubemarkController.kubemarkCluster.markNodeForDeletion(node) + return nil + } + } + } + } + + return fmt.Errorf("can't delete node %s from nodegroup %s. Node does not exist", node, nodeGroup) +} + +func (kubemarkController *KubemarkController) getReplicationControllerByName(name string) *apiv1.ReplicationController { + rcs, err := kubemarkController.externalCluster.rcLister.List(labels.Everything()) + if err != nil { + return nil + } + for _, rc := range rcs { + if rc.ObjectMeta.Name == name { + return rc + } + } + return nil +} + +func (kubemarkController *KubemarkController) getNodeNameForPod(podName string) (string, error) { + pods, err := kubemarkController.externalCluster.podLister.List(labels.Everything()) + if err != nil { + return "", err + } + for _, pod := range pods { + if pod.ObjectMeta.Name == podName { + return pod.Labels["name"], nil + } + } + return "", fmt.Errorf("pod %s not found", podName) +} + +// getNodeTemplate returns the template for hollow node replication controllers +// by looking for an existing hollow node specification. This requires at least +// one kubemark node to be present on startup. +func (kubemarkController *KubemarkController) getNodeTemplate() (*apiv1.ReplicationController, error) { + podName, err := kubemarkController.kubemarkCluster.getHollowNodeName() + if err != nil { + return nil, err + } + hollowNodeName, err := kubemarkController.getNodeNameForPod(podName) + if err != nil { + return nil, err + } + if hollowNode := kubemarkController.getReplicationControllerByName(hollowNodeName); hollowNode != nil { + nodeTemplate := &apiv1.ReplicationController{ + Spec: apiv1.ReplicationControllerSpec{ + Template: hollowNode.Spec.Template, + }, + } + + nodeTemplate.Spec.Selector = nil + nodeTemplate.Namespace = namespaceKubemark + one := int32(1) + nodeTemplate.Spec.Replicas = &one + + return nodeTemplate, nil + } + return nil, fmt.Errorf("can't get hollow node template") +} + +func (kubemarkCluster *kubemarkCluster) getHollowNodeName() (string, error) { + nodes, err := kubemarkCluster.nodeLister.List(labels.Everything()) + if err != nil { + return "", err + } + for _, node := range nodes { + return node.Name, nil + } + return "", fmt.Errorf("did not find any hollow nodes in the cluster") +} + +func (kubemarkCluster *kubemarkCluster) removeUnneededNodes(oldObj interface{}, newObj interface{}) { + node, ok := newObj.(*apiv1.Node) + if !ok { + return + } + for _, condition := range node.Status.Conditions { + // Delete node if it is in unready state, and it has been + // explicitly marked for deletion. + if condition.Type == apiv1.NodeReady && condition.Status != apiv1.ConditionTrue { + kubemarkCluster.nodesToDeleteLock.Lock() + defer kubemarkCluster.nodesToDeleteLock.Unlock() + if kubemarkCluster.nodesToDelete[node.Name] { + kubemarkCluster.nodesToDelete[node.Name] = false + if err := kubemarkCluster.client.CoreV1().Nodes().Delete(node.Name, &metav1.DeleteOptions{}); err != nil { + glog.Errorf("failed to delete node %s from kubemark cluster", node.Name) + } + } + return + } + } +} + +func (kubemarkCluster *kubemarkCluster) markNodeForDeletion(name string) { + kubemarkCluster.nodesToDeleteLock.Lock() + defer kubemarkCluster.nodesToDeleteLock.Unlock() + kubemarkCluster.nodesToDelete[name] = true +} + +func newReplicationControllerInformer(kubeClient kubeclient.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + rcListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "replicationcontrollers", namespaceKubemark, fields.Everything()) + return cache.NewSharedIndexInformer(rcListWatch, &apiv1.ReplicationController{}, resyncPeriod, nil) +} + +func newPodInformer(kubeClient kubeclient.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + podListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", namespaceKubemark, fields.Everything()) + return cache.NewSharedIndexInformer(podListWatch, &apiv1.Pod{}, resyncPeriod, nil) +} diff --git a/test/e2e/framework/BUILD b/test/e2e/framework/BUILD index 06be9b417a8..c9d7f0de190 100644 --- a/test/e2e/framework/BUILD +++ b/test/e2e/framework/BUILD @@ -68,6 +68,7 @@ go_library( "//pkg/kubelet/metrics:go_default_library", "//pkg/kubelet/sysctl:go_default_library", "//pkg/kubelet/util/format:go_default_library", + "//pkg/kubemark:go_default_library", "//pkg/master/ports:go_default_library", "//pkg/ssh:go_default_library", "//pkg/util/file:go_default_library", @@ -128,6 +129,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", "//vendor/k8s.io/client-go/discovery:go_default_library", "//vendor/k8s.io/client-go/dynamic:go_default_library", + "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/kubernetes/typed/authorization/v1beta1:go_default_library", "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", diff --git a/test/e2e/framework/framework.go b/test/e2e/framework/framework.go index ccf9c43fdf8..f4fa2bfa8f5 100644 --- a/test/e2e/framework/framework.go +++ b/test/e2e/framework/framework.go @@ -37,12 +37,15 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/dynamic" + "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" staging "k8s.io/client-go/kubernetes" clientreporestclient "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + "k8s.io/kubernetes/pkg/kubemark" "k8s.io/kubernetes/test/e2e/framework/metrics" testutils "k8s.io/kubernetes/test/utils" @@ -95,6 +98,8 @@ type Framework struct { // Place where various additional data is stored during test run to be printed to ReportDir, // or stdout if ReportDir is not set once test ends. TestSummaries []TestDataSummary + + kubemarkControllerCloseChannel chan struct{} } type TestDataSummary interface { @@ -190,6 +195,23 @@ func (f *Framework) BeforeEach() { f.StagingClient, err = staging.NewForConfig(clientRepoConfig) Expect(err).NotTo(HaveOccurred()) f.ClientPool = dynamic.NewClientPool(config, api.Registry.RESTMapper(), dynamic.LegacyAPIPathResolverFunc) + if ProviderIs("kubemark") && TestContext.KubemarkExternalKubeConfig != "" && TestContext.CloudConfig.KubemarkController == nil { + externalConfig, err := clientcmd.BuildConfigFromFlags("", TestContext.KubemarkExternalKubeConfig) + externalConfig.QPS = f.Options.ClientQPS + externalConfig.Burst = f.Options.ClientBurst + Expect(err).NotTo(HaveOccurred()) + externalClient, err := clientset.NewForConfig(externalConfig) + Expect(err).NotTo(HaveOccurred()) + f.kubemarkControllerCloseChannel = make(chan struct{}) + externalInformerFactory := informers.NewSharedInformerFactory(externalClient, 0) + kubemarkInformerFactory := informers.NewSharedInformerFactory(f.ClientSet, 0) + kubemarkNodeInformer := kubemarkInformerFactory.Core().V1().Nodes() + go kubemarkNodeInformer.Informer().Run(f.kubemarkControllerCloseChannel) + TestContext.CloudConfig.KubemarkController, err = kubemark.NewKubemarkController(externalClient, externalInformerFactory, f.ClientSet, kubemarkNodeInformer) + Expect(err).NotTo(HaveOccurred()) + externalInformerFactory.Start(f.kubemarkControllerCloseChannel) + TestContext.CloudConfig.KubemarkController.Init(f.kubemarkControllerCloseChannel) + } } if !f.SkipNamespaceCreation { @@ -342,6 +364,10 @@ func (f *Framework) AfterEach() { } } + if TestContext.CloudConfig.KubemarkController != nil { + close(f.kubemarkControllerCloseChannel) + } + PrintSummaries(f.TestSummaries, f.BaseName) // Check whether all nodes are ready after the test. diff --git a/test/e2e/framework/size.go b/test/e2e/framework/size.go index ec20dafbe89..7883f41fc08 100644 --- a/test/e2e/framework/size.go +++ b/test/e2e/framework/size.go @@ -51,6 +51,8 @@ func ResizeGroup(group string, size int32) error { } else if TestContext.Provider == "aws" { client := autoscaling.New(session.New()) return awscloud.ResizeInstanceGroup(client, group, int(size)) + } else if TestContext.Provider == "kubemark" { + return TestContext.CloudConfig.KubemarkController.SetNodeGroupSize(group, int(size)) } else { return fmt.Errorf("Provider does not support InstanceGroups") } @@ -72,6 +74,8 @@ func GetGroupNodes(group string) ([]string, error) { lines[i] = line[:strings.Index(line, " ")] } return lines, nil + } else if TestContext.Provider == "kubemark" { + return TestContext.CloudConfig.KubemarkController.GetNodeNamesForNodegroup(group) } else { return nil, fmt.Errorf("provider does not support InstanceGroups") } @@ -99,6 +103,8 @@ func GroupSize(group string) (int, error) { return -1, fmt.Errorf("instance group not found: %s", group) } return instanceGroup.CurrentSize() + } else if TestContext.Provider == "kubemark" { + return TestContext.CloudConfig.KubemarkController.GetNodeGroupSize(group) } else { return -1, fmt.Errorf("provider does not support InstanceGroups") } diff --git a/test/e2e/framework/test_context.go b/test/e2e/framework/test_context.go index 309a372b63c..b4f87b092cb 100644 --- a/test/e2e/framework/test_context.go +++ b/test/e2e/framework/test_context.go @@ -27,17 +27,19 @@ import ( "k8s.io/client-go/tools/clientcmd" "k8s.io/kubernetes/pkg/apis/componentconfig" "k8s.io/kubernetes/pkg/cloudprovider" + "k8s.io/kubernetes/pkg/kubemark" ) const defaultHost = "http://127.0.0.1:8080" type TestContextType struct { - KubeConfig string - KubeContext string - KubeAPIContentType string - KubeVolumeDir string - CertDir string - Host string + KubeConfig string + KubemarkExternalKubeConfig string + KubeContext string + KubeAPIContentType string + KubeVolumeDir string + CertDir string + Host string // TODO: Deprecating this over time... instead just use gobindata_util.go , see #23987. RepoRoot string DockershimCheckpointDir string @@ -158,7 +160,8 @@ type CloudConfig struct { NodeTag string MasterTag string - Provider cloudprovider.Interface + Provider cloudprovider.Interface + KubemarkController *kubemark.KubemarkController } var TestContext TestContextType @@ -201,6 +204,7 @@ func RegisterCommonFlags() { func RegisterClusterFlags() { flag.BoolVar(&TestContext.VerifyServiceAccount, "e2e-verify-service-account", true, "If true tests will verify the service account before running.") flag.StringVar(&TestContext.KubeConfig, clientcmd.RecommendedConfigPathFlag, os.Getenv(clientcmd.RecommendedConfigPathEnvVar), "Path to kubeconfig containing embedded authinfo.") + flag.StringVar(&TestContext.KubemarkExternalKubeConfig, fmt.Sprintf("%s-%s", "kubemark-external", clientcmd.RecommendedConfigPathFlag), "", "Path to kubeconfig containing embedded authinfo for external cluster.") flag.StringVar(&TestContext.KubeContext, clientcmd.FlagContext, "", "kubeconfig context to use/override. If unset, will use value from 'current-context'") flag.StringVar(&TestContext.KubeAPIContentType, "kube-api-content-type", "application/vnd.kubernetes.protobuf", "ContentType used to communicate with apiserver") flag.StringVar(&TestContext.FederatedKubeContext, "federated-kube-context", "e2e-federation", "kubeconfig context for federation.")