mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
kubeadm: Fix self-hosting race condition
This commit is contained in:
parent
9aa04c755f
commit
c08091699c
@ -22,6 +22,7 @@ import (
|
|||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"strconv"
|
"strconv"
|
||||||
"text/template"
|
"text/template"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/renstrom/dedent"
|
"github.com/renstrom/dedent"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
@ -261,8 +262,9 @@ func (i *Init) Run(out io.Writer) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fmt.Printf("[init] Waiting for the kubelet to boot up the control plane as Static Pods from directory %q\n", kubeadmconstants.GetStaticPodDirectory())
|
fmt.Printf("[init] Waiting for the kubelet to boot up the control plane as Static Pods from directory %q\n", kubeadmconstants.GetStaticPodDirectory())
|
||||||
// TODO: Don't wait forever here
|
if err := apiclient.WaitForAPI(client, 30*time.Minute); err != nil {
|
||||||
apiclient.WaitForAPI(client)
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// PHASE 4: Mark the master with the right label/taint
|
// PHASE 4: Mark the master with the right label/taint
|
||||||
if err := markmasterphase.MarkMaster(client, i.cfg.NodeName); err != nil {
|
if err := markmasterphase.MarkMaster(client, i.cfg.NodeName); err != nil {
|
||||||
|
@ -34,6 +34,11 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// selfHostingWaitTimeout describes the maximum amount of time a self-hosting wait process should wait before timing out
|
||||||
|
selfHostingWaitTimeout = 2 * time.Minute
|
||||||
|
)
|
||||||
|
|
||||||
// CreateSelfHostedControlPlane is responsible for turning a Static Pod-hosted control plane to a self-hosted one
|
// CreateSelfHostedControlPlane is responsible for turning a Static Pod-hosted control plane to a self-hosted one
|
||||||
// It achieves that task this way:
|
// It achieves that task this way:
|
||||||
// 1. Load the Static Pod specification from disk (from /etc/kubernetes/manifests)
|
// 1. Load the Static Pod specification from disk (from /etc/kubernetes/manifests)
|
||||||
@ -43,7 +48,8 @@ import (
|
|||||||
// 5. Create the DaemonSet resource. Wait until the Pods are running.
|
// 5. Create the DaemonSet resource. Wait until the Pods are running.
|
||||||
// 6. Remove the Static Pod manifest file. The kubelet will stop the original Static Pod-hosted component that was running.
|
// 6. Remove the Static Pod manifest file. The kubelet will stop the original Static Pod-hosted component that was running.
|
||||||
// 7. The self-hosted containers should now step up and take over.
|
// 7. The self-hosted containers should now step up and take over.
|
||||||
// 8. In order to avoid race conditions, we're still making sure the API /healthz endpoint is healthy
|
// 8. In order to avoid race conditions, we have to make sure that static pod is deleted correctly before we continue
|
||||||
|
// Otherwise, there is a race condition when we proceed without kubelet having restarted the API server correctly and the next .Create call flakes
|
||||||
// 9. Do that for the kube-apiserver, kube-controller-manager and kube-scheduler in a loop
|
// 9. Do that for the kube-apiserver, kube-controller-manager and kube-scheduler in a loop
|
||||||
func CreateSelfHostedControlPlane(cfg *kubeadmapi.MasterConfiguration, client clientset.Interface) error {
|
func CreateSelfHostedControlPlane(cfg *kubeadmapi.MasterConfiguration, client clientset.Interface) error {
|
||||||
|
|
||||||
@ -60,6 +66,12 @@ func CreateSelfHostedControlPlane(cfg *kubeadmapi.MasterConfiguration, client cl
|
|||||||
start := time.Now()
|
start := time.Now()
|
||||||
manifestPath := kubeadmconstants.GetStaticPodFilepath(componentName, kubeadmconstants.GetStaticPodDirectory())
|
manifestPath := kubeadmconstants.GetStaticPodFilepath(componentName, kubeadmconstants.GetStaticPodDirectory())
|
||||||
|
|
||||||
|
// Since we want this function to be idempotent; just continue and try the next component if this file doesn't exist
|
||||||
|
if _, err := os.Stat(manifestPath); err != nil {
|
||||||
|
fmt.Printf("[self-hosted] The Static Pod for the component %q doesn't seem to be on the disk; trying the next one\n", componentName)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
// Load the Static Pod file in order to be able to create a self-hosted variant of that file
|
// Load the Static Pod file in order to be able to create a self-hosted variant of that file
|
||||||
podSpec, err := loadPodSpecFromFile(manifestPath)
|
podSpec, err := loadPodSpecFromFile(manifestPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -75,17 +87,27 @@ func CreateSelfHostedControlPlane(cfg *kubeadmapi.MasterConfiguration, client cl
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Wait for the self-hosted component to come up
|
// Wait for the self-hosted component to come up
|
||||||
// TODO: Enforce a timeout
|
if err := apiclient.WaitForPodsWithLabel(client, selfHostingWaitTimeout, os.Stdout, buildSelfHostedWorkloadLabelQuery(componentName)); err != nil {
|
||||||
apiclient.WaitForPodsWithLabel(client, buildSelfHostedWorkloadLabelQuery(componentName))
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// Remove the old Static Pod manifest
|
// Remove the old Static Pod manifest
|
||||||
if err := os.RemoveAll(manifestPath); err != nil {
|
if err := os.RemoveAll(manifestPath); err != nil {
|
||||||
return fmt.Errorf("unable to delete static pod manifest for %s [%v]", componentName, err)
|
return fmt.Errorf("unable to delete static pod manifest for %s [%v]", componentName, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make sure the API is responsive at /healthz
|
// Wait for the mirror Pod hash to be removed; otherwise we'll run into race conditions here when the kubelet hasn't had time to
|
||||||
// TODO: Follow-up on fixing the race condition here and respect the timeout error that can be returned
|
// remove the Static Pod (or the mirror Pod respectively). This implicitely also tests that the API server endpoint is healthy,
|
||||||
apiclient.WaitForAPI(client)
|
// because this blocks until the API server returns a 404 Not Found when getting the Static Pod
|
||||||
|
staticPodName := fmt.Sprintf("%s-%s", componentName, cfg.NodeName)
|
||||||
|
if err := apiclient.WaitForStaticPodToDisappear(client, selfHostingWaitTimeout, staticPodName); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Just as an extra safety check; make sure the API server is returning ok at the /healthz endpoint (although we know it could return a GET answer for a Pod above)
|
||||||
|
if err := apiclient.WaitForAPI(client, selfHostingWaitTimeout); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
fmt.Printf("[self-hosted] self-hosted %s ready after %f seconds\n", componentName, time.Since(start).Seconds())
|
fmt.Printf("[self-hosted] self-hosted %s ready after %f seconds\n", componentName, time.Since(start).Seconds())
|
||||||
}
|
}
|
||||||
@ -94,6 +116,7 @@ func CreateSelfHostedControlPlane(cfg *kubeadmapi.MasterConfiguration, client cl
|
|||||||
|
|
||||||
// buildDaemonSet is responsible for mutating the PodSpec and return a DaemonSet which is suitable for the self-hosting purporse
|
// buildDaemonSet is responsible for mutating the PodSpec and return a DaemonSet which is suitable for the self-hosting purporse
|
||||||
func buildDaemonSet(cfg *kubeadmapi.MasterConfiguration, name string, podSpec *v1.PodSpec) *extensions.DaemonSet {
|
func buildDaemonSet(cfg *kubeadmapi.MasterConfiguration, name string, podSpec *v1.PodSpec) *extensions.DaemonSet {
|
||||||
|
|
||||||
// Mutate the PodSpec so it's suitable for self-hosting
|
// Mutate the PodSpec so it's suitable for self-hosting
|
||||||
mutatePodSpec(cfg, name, podSpec)
|
mutatePodSpec(cfg, name, podSpec)
|
||||||
|
|
||||||
|
@ -18,20 +18,22 @@ package apiclient
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
|
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
|
||||||
)
|
)
|
||||||
|
|
||||||
// WaitForAPI waits for the API Server's /healthz endpoint to report "ok"
|
// WaitForAPI waits for the API Server's /healthz endpoint to report "ok"
|
||||||
func WaitForAPI(client clientset.Interface) {
|
func WaitForAPI(client clientset.Interface, timeout time.Duration) error {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
wait.PollInfinite(kubeadmconstants.APICallRetryInterval, func() (bool, error) {
|
return wait.PollImmediate(constants.APICallRetryInterval, timeout, func() (bool, error) {
|
||||||
healthStatus := 0
|
healthStatus := 0
|
||||||
client.Discovery().RESTClient().Get().AbsPath("/healthz").Do().StatusCode(&healthStatus)
|
client.Discovery().RESTClient().Get().AbsPath("/healthz").Do().StatusCode(&healthStatus)
|
||||||
if healthStatus != http.StatusOK {
|
if healthStatus != http.StatusOK {
|
||||||
@ -45,22 +47,27 @@ func WaitForAPI(client clientset.Interface) {
|
|||||||
|
|
||||||
// WaitForPodsWithLabel will lookup pods with the given label and wait until they are all
|
// WaitForPodsWithLabel will lookup pods with the given label and wait until they are all
|
||||||
// reporting status as running.
|
// reporting status as running.
|
||||||
func WaitForPodsWithLabel(client clientset.Interface, labelKeyValPair string) {
|
func WaitForPodsWithLabel(client clientset.Interface, timeout time.Duration, out io.Writer, labelKeyValPair string) error {
|
||||||
// TODO: Implement a timeout
|
|
||||||
// TODO: Implement a verbosity switch
|
lastKnownPodNumber := -1
|
||||||
wait.PollInfinite(kubeadmconstants.APICallRetryInterval, func() (bool, error) {
|
return wait.PollImmediate(constants.APICallRetryInterval, timeout, func() (bool, error) {
|
||||||
listOpts := metav1.ListOptions{LabelSelector: labelKeyValPair}
|
listOpts := metav1.ListOptions{LabelSelector: labelKeyValPair}
|
||||||
apiPods, err := client.CoreV1().Pods(metav1.NamespaceSystem).List(listOpts)
|
pods, err := client.CoreV1().Pods(metav1.NamespaceSystem).List(listOpts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("[apiclient] Error getting Pods with label selector %q [%v]\n", labelKeyValPair, err)
|
fmt.Fprintf(out, "[apiclient] Error getting Pods with label selector %q [%v]\n", labelKeyValPair, err)
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(apiPods.Items) == 0 {
|
if lastKnownPodNumber != len(pods.Items) {
|
||||||
|
fmt.Fprintf(out, "[apiclient] Found %d Pods for label selector %s\n", len(pods.Items), labelKeyValPair)
|
||||||
|
lastKnownPodNumber = len(pods.Items)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(pods.Items) == 0 {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
for _, pod := range apiPods.Items {
|
|
||||||
fmt.Printf("[apiclient] Pod %s status: %s\n", pod.Name, pod.Status.Phase)
|
for _, pod := range pods.Items {
|
||||||
if pod.Status.Phase != v1.PodRunning {
|
if pod.Status.Phase != v1.PodRunning {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
@ -69,3 +76,15 @@ func WaitForPodsWithLabel(client clientset.Interface, labelKeyValPair string) {
|
|||||||
return true, nil
|
return true, nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WaitForStaticPodToDisappear blocks until it timeouts or gets a "NotFound" response from the API Server when getting the Static Pod in question
|
||||||
|
func WaitForStaticPodToDisappear(client clientset.Interface, timeout time.Duration, podName string) error {
|
||||||
|
return wait.PollImmediate(constants.APICallRetryInterval, timeout, func() (bool, error) {
|
||||||
|
_, err := client.CoreV1().Pods(metav1.NamespaceSystem).Get(podName, metav1.GetOptions{})
|
||||||
|
if apierrors.IsNotFound(err) {
|
||||||
|
fmt.Printf("[apiclient] The Static Pod %q is now removed\n", podName)
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user