kubeadm: Fully implement 'kubeadm init --dry-run'

This commit is contained in:
Lucas Käldström 2017-08-25 20:31:14 +03:00
parent cb6f32e8ba
commit 2c71814344
No known key found for this signature in database
GPG Key ID: 3FA3783D77751514
14 changed files with 318 additions and 61 deletions

View File

@ -45,6 +45,7 @@ go_library(
"//cmd/kubeadm/app/util:go_default_library",
"//cmd/kubeadm/app/util/apiclient:go_default_library",
"//cmd/kubeadm/app/util/config:go_default_library",
"//cmd/kubeadm/app/util/dryrun:go_default_library",
"//cmd/kubeadm/app/util/kubeconfig:go_default_library",
"//cmd/kubeadm/app/util/pubkeypin:go_default_library",
"//cmd/kubeadm/app/util/token:go_default_library",

View File

@ -139,6 +139,8 @@ func NewCmdConfigUploadFromFlags(out io.Writer, kubeConfigFile *string) *cobra.C
Using from-flags, you can upload configuration to the ConfigMap in the cluster using the same flags you'd give to kubeadm init.
If you initialized your cluster using a v1.7.x or lower kubeadm client and set some flag; you need to run this command with the
same flags before upgrading to v1.8 using 'kubeadm upgrade'.
The configuration is located in the %q namespace in the %q ConfigMap
`), metav1.NamespaceSystem, constants.MasterConfigurationConfigMap),
Run: func(cmd *cobra.Command, args []string) {
var err error

View File

@ -21,6 +21,7 @@ import (
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
"text/template"
"time"
@ -53,6 +54,7 @@ import (
kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util"
"k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient"
configutil "k8s.io/kubernetes/cmd/kubeadm/app/util/config"
dryrunutil "k8s.io/kubernetes/cmd/kubeadm/app/util/dryrun"
kubeconfigutil "k8s.io/kubernetes/cmd/kubeadm/app/util/kubeconfig"
"k8s.io/kubernetes/cmd/kubeadm/app/util/pubkeypin"
"k8s.io/kubernetes/pkg/api"
@ -264,33 +266,67 @@ func (i *Init) Run(out io.Writer) error {
return fmt.Errorf("couldn't parse kubernetes version %q: %v", i.cfg.KubernetesVersion, err)
}
// Get directories to write files to; can be faked if we're dry-running
realCertsDir := i.cfg.CertificatesDir
certsDirToWriteTo, kubeConfigDir, manifestDir, err := getDirectoriesToUse(i.dryRun, i.cfg.CertificatesDir)
if err != nil {
return err
}
// certsDirToWriteTo is gonna equal cfg.CertificatesDir in the normal case, but gonna be a temp directory if dryrunning
i.cfg.CertificatesDir = certsDirToWriteTo
adminKubeConfigPath := filepath.Join(kubeConfigDir, kubeadmconstants.AdminKubeConfigFileName)
// PHASE 1: Generate certificates
if err := certsphase.CreatePKIAssets(i.cfg); err != nil {
return err
}
// PHASE 2: Generate kubeconfig files for the admin and the kubelet
if err := kubeconfigphase.CreateInitKubeConfigFiles(kubeadmconstants.KubernetesDir, i.cfg); err != nil {
if err := kubeconfigphase.CreateInitKubeConfigFiles(kubeConfigDir, i.cfg); err != nil {
return err
}
// Temporarily set cfg.CertificatesDir to the "real value" when writing controlplane manifests
// This is needed for writing the right kind of manifests
i.cfg.CertificatesDir = realCertsDir
// PHASE 3: Bootstrap the control plane
manifestPath := kubeadmconstants.GetStaticPodDirectory()
if err := controlplanephase.CreateInitStaticPodManifestFiles(manifestPath, i.cfg); err != nil {
if err := controlplanephase.CreateInitStaticPodManifestFiles(manifestDir, i.cfg); err != nil {
return err
}
// Add etcd static pod spec only if external etcd is not configured
if len(i.cfg.Etcd.Endpoints) == 0 {
if err := etcdphase.CreateLocalEtcdStaticPodManifestFile(manifestPath, i.cfg); err != nil {
if err := etcdphase.CreateLocalEtcdStaticPodManifestFile(manifestDir, i.cfg); err != nil {
return err
}
}
client, err := createClientsetAndOptionallyWaitForReady(i.cfg, i.dryRun)
// Revert the earlier CertificatesDir assignment to the directory that can be written to
i.cfg.CertificatesDir = certsDirToWriteTo
// If we're dry-running, print the generated manifests
if err := printFilesIfDryRunning(i.dryRun, manifestDir); err != nil {
return err
}
// Create a kubernetes client and wait for the API server to be healthy (if not dryrunning)
client, err := createClient(i.cfg, i.dryRun)
if err != nil {
return err
}
// waiter holds the apiclient.Waiter implementation of choice, responsible for querying the API server in various ways and waiting for conditions to be fulfilled
waiter := getWaiter(i.dryRun, client)
fmt.Printf("[init] Waiting for the kubelet to boot up the control plane as Static Pods from directory %q\n", kubeadmconstants.GetStaticPodDirectory())
fmt.Println("[init] This process often takes about a minute to perform or longer if the control plane images have to be pulled...")
// TODO: Adjust this timeout or start polling the kubelet API
// TODO: Make this timeout more realistic when we do create some more complex logic about the interaction with the kubelet
if err := waiter.WaitForAPI(); err != nil {
return err
}
// PHASE 4: Mark the master with the right label/taint
if err := markmasterphase.MarkMaster(client, i.cfg.NodeName); err != nil {
return err
@ -316,7 +352,7 @@ func (i *Init) Run(out io.Writer) error {
}
// Create the cluster-info ConfigMap with the associated RBAC rules
if err := clusterinfophase.CreateBootstrapConfigMapIfNotExists(client, kubeadmconstants.GetAdminKubeConfigPath()); err != nil {
if err := clusterinfophase.CreateBootstrapConfigMapIfNotExists(client, adminKubeConfigPath); err != nil {
return err
}
if err := clusterinfophase.CreateClusterInfoRBACRules(client); err != nil {
@ -347,11 +383,17 @@ func (i *Init) Run(out io.Writer) error {
// Temporary control plane is up, now we create our self hosted control
// plane components and remove the static manifests:
fmt.Println("[self-hosted] Creating self-hosted control plane...")
if err := selfhostingphase.CreateSelfHostedControlPlane(i.cfg, client); err != nil {
if err := selfhostingphase.CreateSelfHostedControlPlane(manifestDir, kubeConfigDir, i.cfg, client, waiter); err != nil {
return err
}
}
// Exit earlier if we're dryrunning
if i.dryRun {
fmt.Println("[dryrun] Finished dry-running successfully; above are the resources that would be created.")
return nil
}
// Load the CA certificate from so we can pin its public key
caCert, err := pkiutil.TryLoadCertFromDisk(i.cfg.CertificatesDir, kubeadmconstants.CACertAndKeyBaseName)
@ -362,8 +404,7 @@ func (i *Init) Run(out io.Writer) error {
}
ctx := map[string]string{
"KubeConfigPath": kubeadmconstants.GetAdminKubeConfigPath(),
"KubeConfigName": kubeadmconstants.AdminKubeConfigFileName,
"KubeConfigPath": adminKubeConfigPath,
"Token": i.cfg.Token,
"CAPubKeyPin": pubkeypin.Hash(caCert),
"MasterHostPort": masterHostPort,
@ -375,24 +416,59 @@ func (i *Init) Run(out io.Writer) error {
return initDoneTempl.Execute(out, ctx)
}
func createClientsetAndOptionallyWaitForReady(cfg *kubeadmapi.MasterConfiguration, dryRun bool) (clientset.Interface, error) {
// createClient creates a clientset.Interface object
func createClient(cfg *kubeadmapi.MasterConfiguration, dryRun bool) (clientset.Interface, error) {
if dryRun {
// If we're dry-running; we should create a faked client that answers some GETs in order to be able to do the full init flow and just logs the rest of requests
dryRunGetter := apiclient.NewInitDryRunGetter(cfg.NodeName, cfg.Networking.ServiceSubnet)
return apiclient.NewDryRunClient(dryRunGetter, os.Stdout), nil
}
// If we're acting for real,we should create a connection to the API server and wait for it to come up
client, err := kubeconfigutil.ClientSetFromFile(kubeadmconstants.GetAdminKubeConfigPath())
if err != nil {
return nil, err
// If we're acting for real, we should create a connection to the API server and wait for it to come up
return kubeconfigutil.ClientSetFromFile(kubeadmconstants.GetAdminKubeConfigPath())
}
// getDirectoriesToUse returns the (in order) certificates, kubeconfig and Static Pod manifest directories, followed by a possible error
// This behaves differently when dry-running vs the normal flow
func getDirectoriesToUse(dryRun bool, defaultPkiDir string) (string, string, string, error) {
if dryRun {
dryRunDir, err := ioutil.TempDir("", "kubeadm-init-dryrun")
if err != nil {
return "", "", "", fmt.Errorf("couldn't create a temporary directory: %v", err)
}
// Use the same temp dir for all
return dryRunDir, dryRunDir, dryRunDir, nil
}
fmt.Printf("[init] Waiting for the kubelet to boot up the control plane as Static Pods from directory %q\n", kubeadmconstants.GetStaticPodDirectory())
// TODO: Adjust this timeout or start polling the kubelet API
// TODO: Make this timeout more realistic when we do create some more complex logic about the interaction with the kubelet
if err := apiclient.WaitForAPI(client, 30*time.Minute); err != nil {
return nil, err
}
return client, nil
return defaultPkiDir, kubeadmconstants.KubernetesDir, kubeadmconstants.GetStaticPodDirectory(), nil
}
// printFilesIfDryRunning prints the Static Pod manifests to stdout and informs about the temporary directory to go and lookup
func printFilesIfDryRunning(dryRun bool, manifestDir string) error {
if !dryRun {
return nil
}
fmt.Printf("[dryrun] Wrote certificates, kubeconfig files and control plane manifests to %q\n", manifestDir)
fmt.Println("[dryrun] Won't print certificates or kubeconfig files due to the sensitive nature of them")
fmt.Printf("[dryrun] Please go and examine the %q directory for details about what would be written\n", manifestDir)
// Print the contents of the upgraded manifests and pretend like they were in /etc/kubernetes/manifests
files := []dryrunutil.FileToPrint{}
for _, component := range kubeadmconstants.MasterComponents {
realPath := kubeadmconstants.GetStaticPodFilepath(component, manifestDir)
outputPath := kubeadmconstants.GetStaticPodFilepath(component, kubeadmconstants.GetStaticPodDirectory())
files = append(files, dryrunutil.NewFileToPrint(realPath, outputPath))
}
return dryrunutil.PrintDryRunFiles(files, os.Stdout)
}
// getWaiter gets the right waiter implementation
func getWaiter(dryRun bool, client clientset.Interface) apiclient.Waiter {
if dryRun {
return dryrunutil.NewWaiter()
}
// TODO: Adjust this timeout slightly?
return apiclient.NewKubeWaiter(client, 30*time.Minute, os.Stdout)
}

View File

@ -39,6 +39,7 @@ go_library(
"//cmd/kubeadm/app/phases/uploadconfig:go_default_library",
"//cmd/kubeadm/app/preflight:go_default_library",
"//cmd/kubeadm/app/util:go_default_library",
"//cmd/kubeadm/app/util/apiclient:go_default_library",
"//cmd/kubeadm/app/util/config:go_default_library",
"//cmd/kubeadm/app/util/kubeconfig:go_default_library",
"//pkg/api:go_default_library",

View File

@ -17,16 +17,20 @@ limitations under the License.
package phases
import (
"os"
"strings"
"time"
"github.com/spf13/cobra"
kubeadmapiext "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1alpha1"
"k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/validation"
cmdutil "k8s.io/kubernetes/cmd/kubeadm/app/cmd/util"
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/kubernetes/cmd/kubeadm/app/features"
"k8s.io/kubernetes/cmd/kubeadm/app/phases/selfhosting"
kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util"
"k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient"
configutil "k8s.io/kubernetes/cmd/kubeadm/app/util/config"
kubeconfigutil "k8s.io/kubernetes/cmd/kubeadm/app/util/kubeconfig"
"k8s.io/kubernetes/pkg/api"
@ -78,7 +82,8 @@ func getSelfhostingSubCommand() *cobra.Command {
kubeadmutil.CheckErr(err)
// Converts the Static Pod-hosted control plane into a self-hosted one
err = selfhosting.CreateSelfHostedControlPlane(internalcfg, client)
waiter := apiclient.NewKubeWaiter(client, 2*time.Minute, os.Stdout)
err = selfhosting.CreateSelfHostedControlPlane(constants.GetStaticPodDirectory(), constants.KubernetesDir, internalcfg, client, waiter)
kubeadmutil.CheckErr(err)
},
}

View File

@ -54,7 +54,10 @@ const (
// 8. In order to avoid race conditions, we have to make sure that static pod is deleted correctly before we continue
// Otherwise, there is a race condition when we proceed without kubelet having restarted the API server correctly and the next .Create call flakes
// 9. Do that for the kube-apiserver, kube-controller-manager and kube-scheduler in a loop
func CreateSelfHostedControlPlane(cfg *kubeadmapi.MasterConfiguration, client clientset.Interface) error {
func CreateSelfHostedControlPlane(manifestsDir, kubeConfigDir string, cfg *kubeadmapi.MasterConfiguration, client clientset.Interface, waiter apiclient.Waiter) error {
// Adjust the timeout slightly to something self-hosting specific
waiter.SetTimeout(selfHostingWaitTimeout)
// Here the map of different mutators to use for the control plane's podspec is stored
mutators := getDefaultMutators()
@ -66,7 +69,7 @@ func CreateSelfHostedControlPlane(cfg *kubeadmapi.MasterConfiguration, client cl
if err := uploadTLSSecrets(client, cfg.CertificatesDir); err != nil {
return err
}
if err := uploadKubeConfigSecrets(client); err != nil {
if err := uploadKubeConfigSecrets(client, kubeConfigDir); err != nil {
return err
}
// Add the store-certs-in-secrets-specific mutators here so that the self-hosted component starts using them
@ -77,7 +80,7 @@ func CreateSelfHostedControlPlane(cfg *kubeadmapi.MasterConfiguration, client cl
for _, componentName := range kubeadmconstants.MasterComponents {
start := time.Now()
manifestPath := kubeadmconstants.GetStaticPodFilepath(componentName, kubeadmconstants.GetStaticPodDirectory())
manifestPath := kubeadmconstants.GetStaticPodFilepath(componentName, manifestsDir)
// Since we want this function to be idempotent; just continue and try the next component if this file doesn't exist
if _, err := os.Stat(manifestPath); err != nil {
@ -102,7 +105,7 @@ func CreateSelfHostedControlPlane(cfg *kubeadmapi.MasterConfiguration, client cl
}
// Wait for the self-hosted component to come up
if err := apiclient.WaitForPodsWithLabel(client, selfHostingWaitTimeout, os.Stdout, buildSelfHostedWorkloadLabelQuery(componentName)); err != nil {
if err := waiter.WaitForPodsWithLabel(buildSelfHostedWorkloadLabelQuery(componentName)); err != nil {
return err
}
@ -115,12 +118,12 @@ func CreateSelfHostedControlPlane(cfg *kubeadmapi.MasterConfiguration, client cl
// remove the Static Pod (or the mirror Pod respectively). This implicitely also tests that the API server endpoint is healthy,
// because this blocks until the API server returns a 404 Not Found when getting the Static Pod
staticPodName := fmt.Sprintf("%s-%s", componentName, cfg.NodeName)
if err := apiclient.WaitForStaticPodToDisappear(client, selfHostingWaitTimeout, staticPodName); err != nil {
if err := waiter.WaitForPodToDisappear(staticPodName); err != nil {
return err
}
// Just as an extra safety check; make sure the API server is returning ok at the /healthz endpoint (although we know it could return a GET answer for a Pod above)
if err := apiclient.WaitForAPI(client, selfHostingWaitTimeout); err != nil {
if err := waiter.WaitForAPI(); err != nil {
return err
}

View File

@ -19,7 +19,7 @@ package selfhosting
import (
"fmt"
"io/ioutil"
"path"
"path/filepath"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -184,8 +184,8 @@ func uploadTLSSecrets(client clientset.Interface, certDir string) error {
for _, tlsKeyPair := range getTLSKeyPairs() {
secret, err := createTLSSecretFromFiles(
tlsKeyPair.name,
path.Join(certDir, tlsKeyPair.cert),
path.Join(certDir, tlsKeyPair.key),
filepath.Join(certDir, tlsKeyPair.cert),
filepath.Join(certDir, tlsKeyPair.key),
)
if err != nil {
return err
@ -200,13 +200,13 @@ func uploadTLSSecrets(client clientset.Interface, certDir string) error {
return nil
}
func uploadKubeConfigSecrets(client clientset.Interface) error {
func uploadKubeConfigSecrets(client clientset.Interface, kubeConfigDir string) error {
files := []string{
kubeadmconstants.SchedulerKubeConfigFileName,
kubeadmconstants.ControllerManagerKubeConfigFileName,
}
for _, file := range files {
kubeConfigPath := path.Join(kubeadmconstants.KubernetesDir, file)
kubeConfigPath := filepath.Join(kubeConfigDir, file)
secret, err := createOpaqueSecretFromFile(file, kubeConfigPath)
if err != nil {
return err
@ -257,7 +257,7 @@ func createOpaqueSecretFromFile(secretName, file string) (*v1.Secret, error) {
},
Type: v1.SecretTypeOpaque,
Data: map[string][]byte{
path.Base(file): fileBytes,
filepath.Base(file): fileBytes,
},
}, nil
}

View File

@ -51,6 +51,7 @@ filegroup(
":package-srcs",
"//cmd/kubeadm/app/util/apiclient:all-srcs",
"//cmd/kubeadm/app/util/config:all-srcs",
"//cmd/kubeadm/app/util/dryrun:all-srcs",
"//cmd/kubeadm/app/util/kubeconfig:all-srcs",
"//cmd/kubeadm/app/util/pubkeypin:all-srcs",
"//cmd/kubeadm/app/util/staticpod:all-srcs",

View File

@ -18,13 +18,13 @@ go_library(
deps = [
"//cmd/kubeadm/app/constants:go_default_library",
"//pkg/registry/core/service/ipallocator:go_default_library",
"//vendor/github.com/ghodss/yaml:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/api/rbac/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/dynamic:go_default_library",

View File

@ -23,10 +23,11 @@ import (
"io"
"strings"
"github.com/ghodss/yaml"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
clientset "k8s.io/client-go/kubernetes"
fakeclientset "k8s.io/client-go/kubernetes/fake"
clientsetscheme "k8s.io/client-go/kubernetes/scheme"
core "k8s.io/client-go/testing"
)
@ -37,12 +38,18 @@ type DryRunGetter interface {
}
// MarshalFunc takes care of converting any object to a byte array for displaying the object to the user
type MarshalFunc func(runtime.Object) ([]byte, error)
type MarshalFunc func(runtime.Object, schema.GroupVersion) ([]byte, error)
// DefaultMarshalFunc is the default MarshalFunc used; uses YAML to print objects to the user
func DefaultMarshalFunc(obj runtime.Object) ([]byte, error) {
b, err := yaml.Marshal(obj)
return b, err
func DefaultMarshalFunc(obj runtime.Object, gv schema.GroupVersion) ([]byte, error) {
mediaType := "application/yaml"
info, ok := runtime.SerializerInfoForMediaType(clientsetscheme.Codecs.SupportedMediaTypes(), mediaType)
if !ok {
return []byte{}, fmt.Errorf("unsupported media type %q", mediaType)
}
encoder := clientsetscheme.Codecs.EncoderForVersion(info.Serializer, gv)
return runtime.Encode(encoder, obj)
}
// DryRunClientOptions specifies options to pass to NewDryRunClientWithOpts in order to get a dryrun clientset
@ -115,10 +122,10 @@ func NewDryRunClientWithOpts(opts DryRunClientOptions) clientset.Interface {
if opts.PrintGETAndLIST {
// Print the marshalled object format with one tab indentation
objBytes, err := opts.MarshalFunc(obj)
objBytes, err := opts.MarshalFunc(obj, action.GetResource().GroupVersion())
if err == nil {
fmt.Println("[dryrun] Returning faked GET response:")
printBytesWithLinePrefix(opts.Writer, objBytes, "\t")
PrintBytesWithLinePrefix(opts.Writer, objBytes, "\t")
}
}
@ -140,10 +147,10 @@ func NewDryRunClientWithOpts(opts DryRunClientOptions) clientset.Interface {
if opts.PrintGETAndLIST {
// Print the marshalled object format with one tab indentation
objBytes, err := opts.MarshalFunc(objs)
objBytes, err := opts.MarshalFunc(objs, action.GetResource().GroupVersion())
if err == nil {
fmt.Println("[dryrun] Returning faked LIST response:")
printBytesWithLinePrefix(opts.Writer, objBytes, "\t")
PrintBytesWithLinePrefix(opts.Writer, objBytes, "\t")
}
}
@ -214,10 +221,10 @@ func logDryRunAction(action core.Action, w io.Writer, marshalFunc MarshalFunc) {
objAction, ok := action.(actionWithObject)
if ok && objAction.GetObject() != nil {
// Print the marshalled object with a tab indentation
objBytes, err := marshalFunc(objAction.GetObject())
objBytes, err := marshalFunc(objAction.GetObject(), action.GetResource().GroupVersion())
if err == nil {
fmt.Println("[dryrun] Attached object:")
printBytesWithLinePrefix(w, objBytes, "\t")
PrintBytesWithLinePrefix(w, objBytes, "\t")
}
}
@ -228,8 +235,8 @@ func logDryRunAction(action core.Action, w io.Writer, marshalFunc MarshalFunc) {
}
}
// printBytesWithLinePrefix prints objBytes to writer w with linePrefix in the beginning of every line
func printBytesWithLinePrefix(w io.Writer, objBytes []byte, linePrefix string) {
// PrintBytesWithLinePrefix prints objBytes to writer w with linePrefix in the beginning of every line
func PrintBytesWithLinePrefix(w io.Writer, objBytes []byte, linePrefix string) {
scanner := bufio.NewScanner(bytes.NewReader(objBytes))
for scanner.Scan() {
fmt.Fprintf(w, "%s%s\n", linePrefix, scanner.Text())

View File

@ -60,6 +60,8 @@ func TestLogDryRunAction(t *testing.T) {
},
}),
expectedBytes: []byte(`[dryrun] Would perform action CREATE on resource "services" in API group "core/v1"
apiVersion: v1
kind: Service
metadata:
creationTimestamp: null
name: foo

View File

@ -30,12 +30,40 @@ import (
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
)
// Waiter is an interface for waiting for criterias in Kubernetes to happen
type Waiter interface {
// WaitForAPI waits for the API Server's /healthz endpoint to become "ok"
WaitForAPI() error
// WaitForPodsWithLabel waits for Pods in the kube-system namespace to become Ready
WaitForPodsWithLabel(kvLabel string) error
// WaitForPodToDisappear waits for the given Pod in the kube-system namespace to be deleted
WaitForPodToDisappear(staticPodName string) error
// SetTimeout adjusts the timeout to the specified duration
SetTimeout(timeout time.Duration)
}
// KubeWaiter is an implementation of Waiter that is backed by a Kubernetes client
type KubeWaiter struct {
client clientset.Interface
timeout time.Duration
writer io.Writer
}
// NewKubeWaiter returns a new Waiter object that talks to the given Kubernetes cluster
func NewKubeWaiter(client clientset.Interface, timeout time.Duration, writer io.Writer) Waiter {
return &KubeWaiter{
client: client,
timeout: timeout,
writer: writer,
}
}
// WaitForAPI waits for the API Server's /healthz endpoint to report "ok"
func WaitForAPI(client clientset.Interface, timeout time.Duration) error {
func (w *KubeWaiter) WaitForAPI() error {
start := time.Now()
return wait.PollImmediate(constants.APICallRetryInterval, timeout, func() (bool, error) {
return wait.PollImmediate(constants.APICallRetryInterval, w.timeout, func() (bool, error) {
healthStatus := 0
client.Discovery().RESTClient().Get().AbsPath("/healthz").Do().StatusCode(&healthStatus)
w.client.Discovery().RESTClient().Get().AbsPath("/healthz").Do().StatusCode(&healthStatus)
if healthStatus != http.StatusOK {
return false, nil
}
@ -47,19 +75,19 @@ func WaitForAPI(client clientset.Interface, timeout time.Duration) error {
// WaitForPodsWithLabel will lookup pods with the given label and wait until they are all
// reporting status as running.
func WaitForPodsWithLabel(client clientset.Interface, timeout time.Duration, out io.Writer, labelKeyValPair string) error {
func (w *KubeWaiter) WaitForPodsWithLabel(kvLabel string) error {
lastKnownPodNumber := -1
return wait.PollImmediate(constants.APICallRetryInterval, timeout, func() (bool, error) {
listOpts := metav1.ListOptions{LabelSelector: labelKeyValPair}
pods, err := client.CoreV1().Pods(metav1.NamespaceSystem).List(listOpts)
return wait.PollImmediate(constants.APICallRetryInterval, w.timeout, func() (bool, error) {
listOpts := metav1.ListOptions{LabelSelector: kvLabel}
pods, err := w.client.CoreV1().Pods(metav1.NamespaceSystem).List(listOpts)
if err != nil {
fmt.Fprintf(out, "[apiclient] Error getting Pods with label selector %q [%v]\n", labelKeyValPair, err)
fmt.Fprintf(w.writer, "[apiclient] Error getting Pods with label selector %q [%v]\n", kvLabel, err)
return false, nil
}
if lastKnownPodNumber != len(pods.Items) {
fmt.Fprintf(out, "[apiclient] Found %d Pods for label selector %s\n", len(pods.Items), labelKeyValPair)
fmt.Fprintf(w.writer, "[apiclient] Found %d Pods for label selector %s\n", len(pods.Items), kvLabel)
lastKnownPodNumber = len(pods.Items)
}
@ -77,10 +105,10 @@ func WaitForPodsWithLabel(client clientset.Interface, timeout time.Duration, out
})
}
// WaitForStaticPodToDisappear blocks until it timeouts or gets a "NotFound" response from the API Server when getting the Static Pod in question
func WaitForStaticPodToDisappear(client clientset.Interface, timeout time.Duration, podName string) error {
return wait.PollImmediate(constants.APICallRetryInterval, timeout, func() (bool, error) {
_, err := client.CoreV1().Pods(metav1.NamespaceSystem).Get(podName, metav1.GetOptions{})
// WaitForPodToDisappear blocks until it timeouts or gets a "NotFound" response from the API Server when getting the Static Pod in question
func (w *KubeWaiter) WaitForPodToDisappear(podName string) error {
return wait.PollImmediate(constants.APICallRetryInterval, w.timeout, func() (bool, error) {
_, err := w.client.CoreV1().Pods(metav1.NamespaceSystem).Get(podName, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
fmt.Printf("[apiclient] The Static Pod %q is now removed\n", podName)
return true, nil
@ -89,6 +117,11 @@ func WaitForStaticPodToDisappear(client clientset.Interface, timeout time.Durati
})
}
// SetTimeout adjusts the timeout to the specified duration
func (w *KubeWaiter) SetTimeout(timeout time.Duration) {
w.timeout = timeout
}
// TryRunCommand runs a function a maximum of failureThreshold times, and retries on error. If failureThreshold is hit; the last error is returned
func TryRunCommand(f func() error, failureThreshold uint8) error {
var numFailures uint8

View File

@ -0,0 +1,26 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["dryrun.go"],
visibility = ["//visibility:public"],
deps = [
"//cmd/kubeadm/app/util/apiclient:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,100 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dryrun
import (
"fmt"
"io"
"io/ioutil"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/errors"
"k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient"
)
// FileToPrint represents a temporary file on disk that might want to be aliased when printing
// Useful for things like loading a file from /tmp/ but saying to the user "Would write file foo to /etc/kubernetes/..."
type FileToPrint struct {
RealPath string
PrintPath string
}
// NewFileToPrint makes a new instance of FileToPrint with the specified arguments
func NewFileToPrint(realPath, printPath string) FileToPrint {
return FileToPrint{
RealPath: realPath,
PrintPath: printPath,
}
}
// PrintDryRunFiles prints the contents of the FileToPrints given to it to the writer w
func PrintDryRunFiles(files []FileToPrint, w io.Writer) error {
errs := []error{}
for _, file := range files {
if len(file.RealPath) == 0 {
continue
}
fileBytes, err := ioutil.ReadFile(file.RealPath)
if err != nil {
errs = append(errs, err)
continue
}
// Make it possible to fake the path of the file; i.e. you may want to tell the user
// "Here is what would be written to /etc/kubernetes/admin.conf", although you wrote it to /tmp/kubeadm-dryrun/admin.conf and are loading it from there
// Fall back to the "real" path if PrintPath is not set
outputFilePath := file.PrintPath
if len(outputFilePath) == 0 {
outputFilePath = file.RealPath
}
fmt.Fprintf(w, "[dryrun] Would write file %q with content:\n", outputFilePath)
apiclient.PrintBytesWithLinePrefix(w, fileBytes, "\t")
}
return errors.NewAggregate(errs)
}
// Waiter is an implementation of apiclient.Waiter that should be used for dry-running
type Waiter struct{}
// NewWaiter returns a new Waiter object that talks to the given Kubernetes cluster
func NewWaiter() apiclient.Waiter {
return &Waiter{}
}
// WaitForAPI just returns a dummy nil, to indicate that the program should just proceed
func (w *Waiter) WaitForAPI() error {
fmt.Println("[dryrun] Would wait for the API Server's /healthz endpoint to return 'ok'")
return nil
}
// WaitForPodsWithLabel just returns a dummy nil, to indicate that the program should just proceed
func (w *Waiter) WaitForPodsWithLabel(kvLabel string) error {
fmt.Printf("[dryrun] Would wait for the Pods with the label %q in the %s namespace to become Running\n", kvLabel, metav1.NamespaceSystem)
return nil
}
// WaitForPodToDisappear just returns a dummy nil, to indicate that the program should just proceed
func (w *Waiter) WaitForPodToDisappear(podName string) error {
fmt.Printf("[dryrun] Would wait for the %q Pod in the %s namespace to be deleted\n", podName, metav1.NamespaceSystem)
return nil
}
// SetTimeout is a no-op; we don't wait in this implementation
func (w *Waiter) SetTimeout(_ time.Duration) {}