[Federation] Review comment fixes for wait for control plane pods in kubefed init

This commit is contained in:
Irfan Ur Rehman 2016-12-27 22:23:46 +05:30
parent 27f7fca8e0
commit d6cfd826a3
6 changed files with 90 additions and 38 deletions

View File

@ -63,6 +63,7 @@ const (
HostClusterLocalDNSZoneName = "cluster.local." HostClusterLocalDNSZoneName = "cluster.local."
lbAddrRetryInterval = 5 * time.Second lbAddrRetryInterval = 5 * time.Second
podWaitInterval = 2 * time.Second
) )
var ( var (
@ -230,22 +231,14 @@ func initFederation(cmdOut io.Writer, config util.AdminConfig, cmd *cobra.Comman
} }
if !dryRun { if !dryRun {
fmt.Fprintf(cmdOut, "Waiting for control plane to come up") fedPods := []string{serverName, cmName}
for !podRunning(hostClientset, serverName, initFlags.FederationSystemNamespace) { err = waitForPods(hostClientset, fedPods, initFlags.FederationSystemNamespace)
_, err := fmt.Fprintf(cmdOut, ".") if err != nil {
if err != nil { return err
return err
}
//wait indefinite if the pod doesn't show up with correct status
time.Sleep(2 * time.Second)
} }
for !podRunning(hostClientset, cmName, initFlags.FederationSystemNamespace) { err = waitSrvHealthy(config, initFlags.Name, initFlags.Kubeconfig)
_, err := fmt.Fprintf(cmdOut, ".") if err != nil {
if err != nil { return err
return err
}
//wait indefinite if the pod doesn't show up with correct status
time.Sleep(2 * time.Second)
} }
return printSuccess(cmdOut, ips, hostnames) return printSuccess(cmdOut, ips, hostnames)
} }
@ -593,19 +586,46 @@ func createControllerManager(clientset *client.Clientset, namespace, name, svcNa
return clientset.Extensions().Deployments(namespace).Create(dep) return clientset.Extensions().Deployments(namespace).Create(dep)
} }
func podRunning(clientset *client.Clientset, name, nameSpace string) bool { func waitForPods(clientset *client.Clientset, fedPods []string, namespace string) error {
podList, err := clientset.Core().Pods(nameSpace).List(api.ListOptions{}) err := wait.PollInfinite(podWaitInterval, func() (bool, error) {
if err != nil { podCheck := len(fedPods)
//Problem in getting pods at this time podList, err := clientset.Core().Pods(namespace).List(api.ListOptions{})
return false if err != nil {
} return false, nil
for _, pod := range podList.Items {
if strings.Contains(pod.Name, name) && pod.Status.Phase == "Running" {
return true
} }
for _, pod := range podList.Items {
for _, fedPod := range fedPods {
if strings.HasPrefix(pod.Name, fedPod) && pod.Status.Phase == "Running" {
podCheck -= 1
}
}
//ensure that all pods are in running state or keep waiting
if podCheck == 0 {
return true, nil
}
}
return false, nil
})
return err
}
func waitSrvHealthy(config util.AdminConfig, context, kubeconfig string) error {
fedClientSet, err := config.FederationClientset(context, kubeconfig)
if err != nil {
return err
} }
return false fedDiscoveryClient := fedClientSet.Discovery()
err = wait.PollInfinite(podWaitInterval, func() (bool, error) {
body, err := fedDiscoveryClient.RESTClient().Get().AbsPath("/healthz").Do().Raw()
if err != nil {
return false, nil
}
if strings.EqualFold(string(body), "ok") {
return true, nil
}
return false, nil
})
return err
} }
func printSuccess(cmdOut io.Writer, ips, hostnames []string) error { func printSuccess(cmdOut io.Writer, ips, hostnames []string) error {

View File

@ -168,7 +168,7 @@ func TestInitFederation(t *testing.T) {
want = fmt.Sprintf("Federation control plane runs (dry run)\n") want = fmt.Sprintf("Federation control plane runs (dry run)\n")
} }
if got := buf.String(); !strings.Contains(got, want) { if got := buf.String(); got != want {
t.Errorf("[%d] unexpected output: got: %s, want: %s", i, got, want) t.Errorf("[%d] unexpected output: got: %s, want: %s", i, got, want)
if cmdErrMsg != "" { if cmdErrMsg != "" {
t.Errorf("[%d] unexpected error message: %s", i, cmdErrMsg) t.Errorf("[%d] unexpected error message: %s", i, cmdErrMsg)
@ -708,7 +708,7 @@ func fakeInitHostFactory(federationName, namespaceName, ip, dnsZoneName, image,
}, },
} }
ctrlMgrPod := v1.Pod{ cmPod := v1.Pod{
TypeMeta: metav1.TypeMeta{ TypeMeta: metav1.TypeMeta{
Kind: "Pod", Kind: "Pod",
APIVersion: testapi.Extensions.GroupVersion().String(), APIVersion: testapi.Extensions.GroupVersion().String(),
@ -723,7 +723,7 @@ func fakeInitHostFactory(federationName, namespaceName, ip, dnsZoneName, image,
} }
podList.Items = append(podList.Items, apiServerPod) podList.Items = append(podList.Items, apiServerPod)
podList.Items = append(podList.Items, ctrlMgrPod) podList.Items = append(podList.Items, cmPod)
f, tf, codec, _ := cmdtesting.NewAPIFactory() f, tf, codec, _ := cmdtesting.NewAPIFactory()
extCodec := testapi.Extensions.Codec() extCodec := testapi.Extensions.Codec()
@ -733,6 +733,8 @@ func fakeInitHostFactory(federationName, namespaceName, ip, dnsZoneName, image,
NegotiatedSerializer: ns, NegotiatedSerializer: ns,
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) { Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
switch p, m := req.URL.Path, req.Method; { switch p, m := req.URL.Path, req.Method; {
case p == "/healthz":
return &http.Response{StatusCode: http.StatusOK, Header: kubefedtesting.DefaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte("ok")))}, nil
case p == "/api/v1/namespaces" && m == http.MethodPost: case p == "/api/v1/namespaces" && m == http.MethodPost:
body, err := ioutil.ReadAll(req.Body) body, err := ioutil.ReadAll(req.Body)
if err != nil { if err != nil {
@ -828,9 +830,7 @@ func fakeInitHostFactory(federationName, namespaceName, ip, dnsZoneName, image,
return &http.Response{StatusCode: http.StatusCreated, Header: kubefedtesting.DefaultHeader(), Body: kubefedtesting.ObjBody(extCodec, &want)}, nil return &http.Response{StatusCode: http.StatusCreated, Header: kubefedtesting.DefaultHeader(), Body: kubefedtesting.ObjBody(extCodec, &want)}, nil
case p == "/api/v1/namespaces/federation-system/pods" && m == http.MethodGet: case p == "/api/v1/namespaces/federation-system/pods" && m == http.MethodGet:
return &http.Response{StatusCode: http.StatusOK, Header: kubefedtesting.DefaultHeader(), Body: kubefedtesting.ObjBody(codec, &podList)}, nil return &http.Response{StatusCode: http.StatusOK, Header: kubefedtesting.DefaultHeader(), Body: kubefedtesting.ObjBody(codec, &podList)}, nil
default: default:
fmt.Println("Unknon api called %v\n", p)
return nil, fmt.Errorf("unexpected request: %#v\n%#v", req.URL, req) return nil, fmt.Errorf("unexpected request: %#v\n%#v", req.URL, req)
} }
}), }),

View File

@ -12,6 +12,7 @@ go_library(
srcs = ["testing.go"], srcs = ["testing.go"],
tags = ["automanaged"], tags = ["automanaged"],
deps = [ deps = [
"//federation/client/clientset_generated/federation_clientset:go_default_library",
"//federation/pkg/kubefed/util:go_default_library", "//federation/pkg/kubefed/util:go_default_library",
"//pkg/api:go_default_library", "//pkg/api:go_default_library",
"//pkg/apimachinery/registered:go_default_library", "//pkg/apimachinery/registered:go_default_library",

View File

@ -23,6 +23,7 @@ import (
"net/http" "net/http"
"os" "os"
fedclient "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
"k8s.io/kubernetes/federation/pkg/kubefed/util" "k8s.io/kubernetes/federation/pkg/kubefed/util"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apimachinery/registered" "k8s.io/kubernetes/pkg/apimachinery/registered"
@ -53,6 +54,18 @@ func (f *fakeAdminConfig) PathOptions() *clientcmd.PathOptions {
return f.pathOptions return f.pathOptions
} }
func (f *fakeAdminConfig) FederationClientset(context, kubeconfigPath string) (*fedclient.Clientset, error) {
fakeRestClient, err := f.hostFactory.RESTClient()
if err != nil {
return nil, err
}
// we ignore the function params and use the client from
// the same fakefactory to create a federation clientset
// our fake factory exposes only the healthz api for this client
return fedclient.New(fakeRestClient), nil
}
func (f *fakeAdminConfig) HostFactory(host, kubeconfigPath string) cmdutil.Factory { func (f *fakeAdminConfig) HostFactory(host, kubeconfigPath string) cmdutil.Factory {
return f.hostFactory return f.hostFactory
} }

View File

@ -12,6 +12,7 @@ go_library(
srcs = ["util.go"], srcs = ["util.go"],
tags = ["automanaged"], tags = ["automanaged"],
deps = [ deps = [
"//federation/client/clientset_generated/federation_clientset:go_default_library",
"//pkg/api:go_default_library", "//pkg/api:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library",
"//pkg/client/unversioned/clientcmd:go_default_library", "//pkg/client/unversioned/clientcmd:go_default_library",

View File

@ -17,6 +17,7 @@ limitations under the License.
package util package util
import ( import (
fedclient "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
client "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" client "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd" "k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
@ -39,13 +40,16 @@ const (
// AdminConfig provides a filesystem based kubeconfig (via // AdminConfig provides a filesystem based kubeconfig (via
// `PathOptions()`) and a mechanism to talk to the federation // `PathOptions()`) and a mechanism to talk to the federation
// host cluster. // host cluster and the federation control plane api server.
type AdminConfig interface { type AdminConfig interface {
// PathOptions provides filesystem based kubeconfig access. // PathOptions provides filesystem based kubeconfig access.
PathOptions() *clientcmd.PathOptions PathOptions() *clientcmd.PathOptions
// FedClientSet provides a federation API compliant clientset
// to communicate with the federation control plane api server
FederationClientset(context, kubeconfigPath string) (*fedclient.Clientset, error)
// HostFactory provides a mechanism to communicate with the // HostFactory provides a mechanism to communicate with the
// cluster where federation control plane is hosted. // cluster where federation control plane is hosted.
HostFactory(host, kubeconfigPath string) cmdutil.Factory HostFactory(hostcontext, kubeconfigPath string) cmdutil.Factory
} }
// adminConfig implements the AdminConfig interface. // adminConfig implements the AdminConfig interface.
@ -64,17 +68,30 @@ func (a *adminConfig) PathOptions() *clientcmd.PathOptions {
return a.pathOptions return a.pathOptions
} }
func (a *adminConfig) HostFactory(host, kubeconfigPath string) cmdutil.Factory { func (a *adminConfig) FederationClientset(context, kubeconfigPath string) (*fedclient.Clientset, error) {
fedConfig := a.getClientConfig(context, kubeconfigPath)
fedClientConfig, err := fedConfig.ClientConfig()
if err != nil {
return nil, err
}
return fedclient.NewForConfigOrDie(fedClientConfig), nil
}
func (a *adminConfig) HostFactory(hostcontext, kubeconfigPath string) cmdutil.Factory {
hostClientConfig := a.getClientConfig(hostcontext, kubeconfigPath)
return cmdutil.NewFactory(hostClientConfig)
}
func (a *adminConfig) getClientConfig(context, kubeconfigPath string) clientcmd.ClientConfig {
loadingRules := *a.pathOptions.LoadingRules loadingRules := *a.pathOptions.LoadingRules
loadingRules.Precedence = a.pathOptions.GetLoadingPrecedence() loadingRules.Precedence = a.pathOptions.GetLoadingPrecedence()
loadingRules.ExplicitPath = kubeconfigPath loadingRules.ExplicitPath = kubeconfigPath
overrides := &clientcmd.ConfigOverrides{ overrides := &clientcmd.ConfigOverrides{
CurrentContext: host, CurrentContext: context,
} }
hostClientConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(&loadingRules, overrides) return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(&loadingRules, overrides)
return cmdutil.NewFactory(hostClientConfig)
} }
// SubcommandFlags holds the flags required by the subcommands of // SubcommandFlags holds the flags required by the subcommands of