kubeadm: remove the ClusterStatus object from v1beta3

- Remove the object form v1beta3 and internal type
- Deprecate a couple of phases that were specifically designed / named to
modify the ClusterStatus object
- Adapt logic around annotation vs ClusterStatus retrieval
- Update unit tests
- Run generators
This commit is contained in:
Lubomir I. Ivanov 2021-05-11 21:07:36 +03:00
parent 382a33986b
commit 8b9d0dceb1
19 changed files with 62 additions and 596 deletions

View File

@ -48,7 +48,6 @@ func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(SchemeGroupVersion,
&InitConfiguration{},
&ClusterConfiguration{},
&ClusterStatus{},
&JoinConfiguration{},
)
return nil

View File

@ -179,18 +179,6 @@ type ImageMeta struct {
//TODO: evaluate if we need also a ImageName based on user feedbacks
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// ClusterStatus contains the cluster status. The ClusterStatus will be stored in the kubeadm-config
// ConfigMap in the cluster, and then updated by kubeadm when additional control plane instance joins or leaves the cluster.
type ClusterStatus struct {
metav1.TypeMeta
// APIEndpoints currently available in the cluster, one for each control plane/api server instance.
// The key of the map is the IP of the host's default interface
APIEndpoints map[string]APIEndpoint
}
// APIEndpoint struct contains elements of API server instance deployed on a node.
type APIEndpoint struct {
// AdvertiseAddress sets the IP address for the API server to advertise.

View File

@ -92,16 +92,6 @@ func RegisterConversions(s *runtime.Scheme) error {
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*ClusterStatus)(nil), (*kubeadm.ClusterStatus)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1beta2_ClusterStatus_To_kubeadm_ClusterStatus(a.(*ClusterStatus), b.(*kubeadm.ClusterStatus), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*kubeadm.ClusterStatus)(nil), (*ClusterStatus)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_kubeadm_ClusterStatus_To_v1beta2_ClusterStatus(a.(*kubeadm.ClusterStatus), b.(*ClusterStatus), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*ControlPlaneComponent)(nil), (*kubeadm.ControlPlaneComponent)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1beta2_ControlPlaneComponent_To_kubeadm_ControlPlaneComponent(a.(*ControlPlaneComponent), b.(*kubeadm.ControlPlaneComponent), scope)
}); err != nil {
@ -442,26 +432,6 @@ func Convert_kubeadm_ClusterConfiguration_To_v1beta2_ClusterConfiguration(in *ku
return autoConvert_kubeadm_ClusterConfiguration_To_v1beta2_ClusterConfiguration(in, out, s)
}
func autoConvert_v1beta2_ClusterStatus_To_kubeadm_ClusterStatus(in *ClusterStatus, out *kubeadm.ClusterStatus, s conversion.Scope) error {
out.APIEndpoints = *(*map[string]kubeadm.APIEndpoint)(unsafe.Pointer(&in.APIEndpoints))
return nil
}
// Convert_v1beta2_ClusterStatus_To_kubeadm_ClusterStatus is an autogenerated conversion function.
func Convert_v1beta2_ClusterStatus_To_kubeadm_ClusterStatus(in *ClusterStatus, out *kubeadm.ClusterStatus, s conversion.Scope) error {
return autoConvert_v1beta2_ClusterStatus_To_kubeadm_ClusterStatus(in, out, s)
}
func autoConvert_kubeadm_ClusterStatus_To_v1beta2_ClusterStatus(in *kubeadm.ClusterStatus, out *ClusterStatus, s conversion.Scope) error {
out.APIEndpoints = *(*map[string]APIEndpoint)(unsafe.Pointer(&in.APIEndpoints))
return nil
}
// Convert_kubeadm_ClusterStatus_To_v1beta2_ClusterStatus is an autogenerated conversion function.
func Convert_kubeadm_ClusterStatus_To_v1beta2_ClusterStatus(in *kubeadm.ClusterStatus, out *ClusterStatus, s conversion.Scope) error {
return autoConvert_kubeadm_ClusterStatus_To_v1beta2_ClusterStatus(in, out, s)
}
func autoConvert_v1beta2_ControlPlaneComponent_To_kubeadm_ControlPlaneComponent(in *ControlPlaneComponent, out *kubeadm.ControlPlaneComponent, s conversion.Scope) error {
out.ExtraArgs = *(*map[string]string)(unsafe.Pointer(&in.ExtraArgs))
out.ExtraVolumes = *(*[]kubeadm.HostPathMount)(unsafe.Pointer(&in.ExtraVolumes))

View File

@ -60,7 +60,6 @@ func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(SchemeGroupVersion,
&InitConfiguration{},
&ClusterConfiguration{},
&ClusterStatus{},
&JoinConfiguration{},
)
metav1.AddToGroupVersion(scheme, SchemeGroupVersion)

View File

@ -153,18 +153,6 @@ type ImageMeta struct {
//TODO: evaluate if we need also a ImageName based on user feedbacks
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// ClusterStatus contains the cluster status. The ClusterStatus will be stored in the kubeadm-config
// ConfigMap in the cluster, and then updated by kubeadm when additional control plane instance joins or leaves the cluster.
type ClusterStatus struct {
metav1.TypeMeta `json:",inline"`
// APIEndpoints currently available in the cluster, one for each control plane/api server instance.
// The key of the map is the IP of the host's default interface
APIEndpoints map[string]APIEndpoint `json:"apiEndpoints"`
}
// APIEndpoint struct contains elements of API server instance deployed on a node.
type APIEndpoint struct {
// AdvertiseAddress sets the IP address for the API server to advertise.

View File

@ -92,16 +92,6 @@ func RegisterConversions(s *runtime.Scheme) error {
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*ClusterStatus)(nil), (*kubeadm.ClusterStatus)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1beta3_ClusterStatus_To_kubeadm_ClusterStatus(a.(*ClusterStatus), b.(*kubeadm.ClusterStatus), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*kubeadm.ClusterStatus)(nil), (*ClusterStatus)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_kubeadm_ClusterStatus_To_v1beta3_ClusterStatus(a.(*kubeadm.ClusterStatus), b.(*ClusterStatus), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*ControlPlaneComponent)(nil), (*kubeadm.ControlPlaneComponent)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1beta3_ControlPlaneComponent_To_kubeadm_ControlPlaneComponent(a.(*ControlPlaneComponent), b.(*kubeadm.ControlPlaneComponent), scope)
}); err != nil {
@ -441,26 +431,6 @@ func Convert_kubeadm_ClusterConfiguration_To_v1beta3_ClusterConfiguration(in *ku
return autoConvert_kubeadm_ClusterConfiguration_To_v1beta3_ClusterConfiguration(in, out, s)
}
func autoConvert_v1beta3_ClusterStatus_To_kubeadm_ClusterStatus(in *ClusterStatus, out *kubeadm.ClusterStatus, s conversion.Scope) error {
out.APIEndpoints = *(*map[string]kubeadm.APIEndpoint)(unsafe.Pointer(&in.APIEndpoints))
return nil
}
// Convert_v1beta3_ClusterStatus_To_kubeadm_ClusterStatus is an autogenerated conversion function.
func Convert_v1beta3_ClusterStatus_To_kubeadm_ClusterStatus(in *ClusterStatus, out *kubeadm.ClusterStatus, s conversion.Scope) error {
return autoConvert_v1beta3_ClusterStatus_To_kubeadm_ClusterStatus(in, out, s)
}
func autoConvert_kubeadm_ClusterStatus_To_v1beta3_ClusterStatus(in *kubeadm.ClusterStatus, out *ClusterStatus, s conversion.Scope) error {
out.APIEndpoints = *(*map[string]APIEndpoint)(unsafe.Pointer(&in.APIEndpoints))
return nil
}
// Convert_kubeadm_ClusterStatus_To_v1beta3_ClusterStatus is an autogenerated conversion function.
func Convert_kubeadm_ClusterStatus_To_v1beta3_ClusterStatus(in *kubeadm.ClusterStatus, out *ClusterStatus, s conversion.Scope) error {
return autoConvert_kubeadm_ClusterStatus_To_v1beta3_ClusterStatus(in, out, s)
}
func autoConvert_v1beta3_ControlPlaneComponent_To_kubeadm_ControlPlaneComponent(in *ControlPlaneComponent, out *kubeadm.ControlPlaneComponent, s conversion.Scope) error {
out.ExtraArgs = *(*map[string]string)(unsafe.Pointer(&in.ExtraArgs))
out.ExtraVolumes = *(*[]kubeadm.HostPathMount)(unsafe.Pointer(&in.ExtraVolumes))

View File

@ -184,38 +184,6 @@ func (in *ClusterConfiguration) DeepCopyObject() runtime.Object {
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ClusterStatus) DeepCopyInto(out *ClusterStatus) {
*out = *in
out.TypeMeta = in.TypeMeta
if in.APIEndpoints != nil {
in, out := &in.APIEndpoints, &out.APIEndpoints
*out = make(map[string]APIEndpoint, len(*in))
for key, val := range *in {
(*out)[key] = val
}
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterStatus.
func (in *ClusterStatus) DeepCopy() *ClusterStatus {
if in == nil {
return nil
}
out := new(ClusterStatus)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *ClusterStatus) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ControlPlaneComponent) DeepCopyInto(out *ControlPlaneComponent) {
*out = *in

View File

@ -29,7 +29,6 @@ import (
// All generated defaulters are covering - they call all nested defaulters.
func RegisterDefaults(scheme *runtime.Scheme) error {
scheme.AddTypeDefaultingFunc(&ClusterConfiguration{}, func(obj interface{}) { SetObjectDefaults_ClusterConfiguration(obj.(*ClusterConfiguration)) })
scheme.AddTypeDefaultingFunc(&ClusterStatus{}, func(obj interface{}) { SetObjectDefaults_ClusterStatus(obj.(*ClusterStatus)) })
scheme.AddTypeDefaultingFunc(&InitConfiguration{}, func(obj interface{}) { SetObjectDefaults_InitConfiguration(obj.(*InitConfiguration)) })
scheme.AddTypeDefaultingFunc(&JoinConfiguration{}, func(obj interface{}) { SetObjectDefaults_JoinConfiguration(obj.(*JoinConfiguration)) })
return nil
@ -40,9 +39,6 @@ func SetObjectDefaults_ClusterConfiguration(in *ClusterConfiguration) {
SetDefaults_APIServer(&in.APIServer)
}
func SetObjectDefaults_ClusterStatus(in *ClusterStatus) {
}
func SetObjectDefaults_InitConfiguration(in *InitConfiguration) {
SetDefaults_InitConfiguration(in)
for i := range in.BootstrapTokens {

View File

@ -191,38 +191,6 @@ func (in *ClusterConfiguration) DeepCopyObject() runtime.Object {
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ClusterStatus) DeepCopyInto(out *ClusterStatus) {
*out = *in
out.TypeMeta = in.TypeMeta
if in.APIEndpoints != nil {
in, out := &in.APIEndpoints, &out.APIEndpoints
*out = make(map[string]APIEndpoint, len(*in))
for key, val := range *in {
(*out)[key] = val
}
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterStatus.
func (in *ClusterStatus) DeepCopy() *ClusterStatus {
if in == nil {
return nil
}
out := new(ClusterStatus)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *ClusterStatus) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in ComponentConfigMap) DeepCopyInto(out *ComponentConfigMap) {
{

View File

@ -28,7 +28,6 @@ import (
kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
etcdphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/etcd"
markcontrolplanephase "k8s.io/kubernetes/cmd/kubeadm/app/phases/markcontrolplane"
uploadconfigphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/uploadconfig"
etcdutil "k8s.io/kubernetes/cmd/kubeadm/app/util/etcd"
)
@ -87,8 +86,7 @@ func newUpdateStatusSubphase() workflow.Phase {
return workflow.Phase{
Name: "update-status",
Short: fmt.Sprintf(
"Register the new control-plane node into the %s maintained in the %s ConfigMap",
kubeadmconstants.ClusterStatusConfigMapKey,
"Register the new control-plane node into the ClusterStatus maintained in the %s ConfigMap (DEPRECATED)",
kubeadmconstants.KubeadmConfigConfigMap,
),
Run: runUpdateStatusPhase,
@ -160,24 +158,10 @@ func runUpdateStatusPhase(c workflow.RunData) error {
return errors.New("control-plane-join phase invoked with an invalid data struct")
}
if data.Cfg().ControlPlane == nil {
return nil
if data.Cfg().ControlPlane != nil {
fmt.Println("The 'update-status' phase is deprecated and will be removed in a future release. " +
"Currently it performs no operation")
}
// gets access to the cluster using the identity defined in admin.conf
client, err := data.ClientSet()
if err != nil {
return errors.Wrap(err, "couldn't create Kubernetes client")
}
cfg, err := data.InitCfg()
if err != nil {
return err
}
if err := uploadconfigphase.UploadConfiguration(cfg, client); err != nil {
return errors.Wrap(err, "error uploading configuration")
}
return nil
}

View File

@ -18,19 +18,18 @@ package phases
import (
"errors"
"fmt"
"os"
"k8s.io/kubernetes/cmd/kubeadm/app/cmd/phases/workflow"
kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/kubernetes/cmd/kubeadm/app/phases/uploadconfig"
)
// NewUpdateClusterStatus creates a kubeadm workflow phase for update-cluster-status
func NewUpdateClusterStatus() workflow.Phase {
return workflow.Phase{
Name: "update-cluster-status",
Short: "Remove this node from the ClusterStatus object.",
Long: "Remove this node from the ClusterStatus object if the node is a control plane node.",
Short: "Remove this node from the ClusterStatus object (DEPRECATED).",
Run: runUpdateClusterStatus,
}
}
@ -41,14 +40,11 @@ func runUpdateClusterStatus(c workflow.RunData) error {
return errors.New("update-cluster-status phase invoked with an invalid data struct")
}
// Reset the ClusterStatus for a given control-plane node.
cfg := r.Cfg()
if isControlPlane() && cfg != nil {
if err := uploadconfig.ResetClusterStatusForNode(cfg.NodeRegistration.Name, r.Client()); err != nil {
return err
}
fmt.Println("The 'update-cluster-status' phase is deprecated and will be removed in a future release. " +
"Currently it performs no operation")
}
return nil
}

View File

@ -248,9 +248,6 @@ const (
// ClusterConfigurationConfigMapKey specifies in what ConfigMap key the cluster configuration should be stored
ClusterConfigurationConfigMapKey = "ClusterConfiguration"
// ClusterStatusConfigMapKey specifies in what ConfigMap key the cluster status should be stored
ClusterStatusConfigMapKey = "ClusterStatus"
// KubeProxyConfigMap specifies in what ConfigMap in the kube-system namespace the kube-proxy configuration should be stored
KubeProxyConfigMap = "kube-proxy"

View File

@ -19,12 +19,10 @@ package uploadconfig
import (
"fmt"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
rbac "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient"
@ -38,30 +36,6 @@ const (
NodesKubeadmConfigClusterRoleName = "kubeadm:nodes-kubeadm-config"
)
// ResetClusterStatusForNode removes the APIEndpoint of a given control-plane node
// from the ClusterStatus and updates the kubeadm ConfigMap
func ResetClusterStatusForNode(nodeName string, client clientset.Interface) error {
fmt.Printf("[reset] Removing info for node %q from the ConfigMap %q in the %q Namespace\n",
nodeName, kubeadmconstants.KubeadmConfigConfigMap, metav1.NamespaceSystem)
return apiclient.MutateConfigMap(client, metav1.ObjectMeta{
Name: kubeadmconstants.KubeadmConfigConfigMap,
Namespace: metav1.NamespaceSystem,
}, func(cm *v1.ConfigMap) error {
return mutateClusterStatus(cm, func(cs *kubeadmapi.ClusterStatus) error {
// Handle a nil APIEndpoints map. Should only happen if someone manually
// interacted with the ConfigMap.
if cs.APIEndpoints == nil {
return errors.Errorf("APIEndpoints from ConfigMap %q in the %q Namespace is nil",
kubeadmconstants.KubeadmConfigConfigMap, metav1.NamespaceSystem)
}
klog.V(2).Infof("Removing APIEndpoint for Node %q", nodeName)
delete(cs.APIEndpoints, nodeName)
return nil
})
})
}
// UploadConfiguration saves the InitConfiguration used for later reference (when upgrading for instance)
func UploadConfiguration(cfg *kubeadmapi.InitConfiguration, client clientset.Interface) error {
fmt.Printf("[upload-config] Storing the configuration used in ConfigMap %q in the %q Namespace\n", kubeadmconstants.KubeadmConfigConfigMap, metav1.NamespaceSystem)
@ -78,18 +52,6 @@ func UploadConfiguration(cfg *kubeadmapi.InitConfiguration, client clientset.Int
return err
}
// Prepare the ClusterStatus for upload
clusterStatus := &kubeadmapi.ClusterStatus{
APIEndpoints: map[string]kubeadmapi.APIEndpoint{
cfg.NodeRegistration.Name: cfg.LocalAPIEndpoint,
},
}
// Marshal the ClusterStatus into YAML
clusterStatusYaml, err := configutil.MarshalKubeadmConfigObject(clusterStatus)
if err != nil {
return err
}
err = apiclient.CreateOrMutateConfigMap(client, &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: kubeadmconstants.KubeadmConfigConfigMap,
@ -97,23 +59,12 @@ func UploadConfiguration(cfg *kubeadmapi.InitConfiguration, client clientset.Int
},
Data: map[string]string{
kubeadmconstants.ClusterConfigurationConfigMapKey: string(clusterConfigurationYaml),
kubeadmconstants.ClusterStatusConfigMapKey: string(clusterStatusYaml),
},
}, func(cm *v1.ConfigMap) error {
// Upgrade will call to UploadConfiguration with a modified KubernetesVersion reflecting the new
// Kubernetes version. In that case, the mutation path will take place.
cm.Data[kubeadmconstants.ClusterConfigurationConfigMapKey] = string(clusterConfigurationYaml)
// Mutate the ClusterStatus now
return mutateClusterStatus(cm, func(cs *kubeadmapi.ClusterStatus) error {
// Handle a nil APIEndpoints map. Should only happen if someone manually
// interacted with the ConfigMap.
if cs.APIEndpoints == nil {
return errors.Errorf("APIEndpoints from ConfigMap %q in the %q Namespace is nil",
kubeadmconstants.KubeadmConfigConfigMap, metav1.NamespaceSystem)
}
cs.APIEndpoints[cfg.NodeRegistration.Name] = cfg.LocalAPIEndpoint
return nil
})
return nil
})
if err != nil {
return err
@ -163,23 +114,3 @@ func UploadConfiguration(cfg *kubeadmapi.InitConfiguration, client clientset.Int
},
})
}
func mutateClusterStatus(cm *v1.ConfigMap, mutator func(*kubeadmapi.ClusterStatus) error) error {
// Obtain the existing ClusterStatus object
clusterStatus, err := configutil.UnmarshalClusterStatus(cm.Data)
if err != nil {
return err
}
// Mutate the ClusterStatus
if err := mutator(clusterStatus); err != nil {
return err
}
// Marshal the ClusterStatus back into YAML
clusterStatusYaml, err := configutil.MarshalKubeadmConfigObject(clusterStatus)
if err != nil {
return err
}
// Write the marshaled mutated cluster status back to the ConfigMap
cm.Data[kubeadmconstants.ClusterStatusConfigMapKey] = string(clusterStatusYaml)
return nil
}

View File

@ -21,7 +21,6 @@ import (
"reflect"
"testing"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
clientsetfake "k8s.io/client-go/kubernetes/fake"
@ -58,12 +57,6 @@ func TestUploadConfiguration(t *testing.T) {
cfg.NodeRegistration.Name = "node-foo"
cfg.NodeRegistration.CRISocket = kubeadmconstants.UnknownCRISocket
status := &kubeadmapi.ClusterStatus{
APIEndpoints: map[string]kubeadmapi.APIEndpoint{
"node-foo": cfg.LocalAPIEndpoint,
},
}
client := clientsetfake.NewSimpleClientset()
// For idempotent test, we check the result of the second call.
if err := UploadConfiguration(cfg, client); err != nil {
@ -99,54 +92,7 @@ func TestUploadConfiguration(t *testing.T) {
if !reflect.DeepEqual(decodedCfg, &cfg.ClusterConfiguration) {
t2.Errorf("the initial and decoded ClusterConfiguration didn't match:\n%#v\n===\n%#v", decodedCfg, &cfg.ClusterConfiguration)
}
statusData := controlPlaneCfg.Data[kubeadmconstants.ClusterStatusConfigMapKey]
if statusData == "" {
t2.Fatal("failed to find ClusterStatusConfigMapKey key")
}
decodedStatus := &kubeadmapi.ClusterStatus{}
if err := runtime.DecodeInto(kubeadmscheme.Codecs.UniversalDecoder(), []byte(statusData), decodedStatus); err != nil {
t2.Fatalf("unable to decode status from bytes: %v", err)
}
if !reflect.DeepEqual(decodedStatus, status) {
t2.Error("the initial and decoded ClusterStatus didn't match")
}
}
})
}
}
func TestMutateClusterStatus(t *testing.T) {
cm := &v1.ConfigMap{
Data: map[string]string{
kubeadmconstants.ClusterStatusConfigMapKey: "",
},
}
endpoints := map[string]kubeadmapi.APIEndpoint{
"some-node": {
AdvertiseAddress: "127.0.0.1",
BindPort: 6443,
},
}
err := mutateClusterStatus(cm, func(cs *kubeadmapi.ClusterStatus) error {
cs.APIEndpoints = endpoints
return nil
})
if err != nil {
t.Fatalf("could not mutate cluster status: %v", err)
}
// Try to unmarshal the cluster status back and compare with the original mutated structure
cs, err := configutil.UnmarshalClusterStatus(cm.Data)
if err != nil {
t.Fatalf("could not unmarshal cluster status: %v", err)
}
if !reflect.DeepEqual(cs.APIEndpoints, endpoints) {
t.Fatalf("mutation of cluster status failed: %v", err)
}
}

View File

@ -26,7 +26,6 @@ import (
"github.com/pkg/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
errorsutil "k8s.io/apimachinery/pkg/util/errors"
@ -41,25 +40,6 @@ import (
"k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient"
)
// unretriableError is an error used temporarily while we are migrating from the
// ClusterStatus struct to an annotation Pod based information. When performing
// the upgrade of all control plane nodes with `kubeadm upgrade apply` and
// `kubeadm upgrade node` we don't want to retry as if we were hitting connectivity
// issues when the pod annotation is missing on the API server pods. This error will
// be used in such scenario, for failing fast, and falling back to the ClusterStatus
// retrieval in those cases.
type unretriableError struct {
err error
}
func newUnretriableError(err error) *unretriableError {
return &unretriableError{err: err}
}
func (ue *unretriableError) Error() string {
return fmt.Sprintf("unretriable error: %s", ue.err.Error())
}
// FetchInitConfigurationFromCluster fetches configuration from a ConfigMap in the cluster
func FetchInitConfigurationFromCluster(client clientset.Interface, w io.Writer, logPrefix string, newControlPlane, skipComponentConfigs bool) (*kubeadmapi.InitConfiguration, error) {
fmt.Fprintf(w, "[%s] Reading configuration from the cluster...\n", logPrefix)
@ -216,17 +196,6 @@ func getAPIEndpointWithBackoff(client clientset.Interface, nodeName string, apiE
return nil
}
errs = append(errs, errors.WithMessagef(err, "could not retrieve API endpoints for node %q using pod annotations", nodeName))
// NB: this is a fallback when there is no annotation found in the API server pod that contains
// the API endpoint, and so we fallback to reading the ClusterStatus struct present in the
// kubeadm-config ConfigMap. This can happen for example, when performing the first
// `kubeadm upgrade apply` and `kubeadm upgrade node` cycle on the whole cluster. This logic
// will be removed when the cluster status struct is removed from the kubeadm-config ConfigMap.
if err = getAPIEndpointFromClusterStatus(client, nodeName, apiEndpoint); err == nil {
return nil
}
errs = append(errs, errors.WithMessagef(err, "could not retrieve API endpoints for node %q using cluster status", nodeName))
return errorsutil.NewAggregate(errs)
}
@ -237,13 +206,6 @@ func getAPIEndpointFromPodAnnotation(client clientset.Interface, nodeName string
// static pods were not yet mirrored into the API server we want to wait for this propagation.
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
rawAPIEndpoint, lastErr = getRawAPIEndpointFromPodAnnotationWithoutRetry(client, nodeName)
// TODO (ereslibre): this logic will need tweaking once that we get rid of the ClusterStatus, since we won't have
// the ClusterStatus safety net, we will want to remove the UnretriableError and not make the distinction here
// anymore.
if _, ok := lastErr.(*unretriableError); ok {
// Fail fast scenario, to be removed once we get rid of the ClusterStatus
return true, errors.Wrapf(lastErr, "API server Pods exist, but no API endpoint annotations were found")
}
return lastErr == nil, nil
})
if err != nil {
@ -274,50 +236,5 @@ func getRawAPIEndpointFromPodAnnotationWithoutRetry(client clientset.Interface,
if apiServerEndpoint, ok := podList.Items[0].Annotations[constants.KubeAPIServerAdvertiseAddressEndpointAnnotationKey]; ok {
return apiServerEndpoint, nil
}
return "", newUnretriableError(errors.Errorf("API server pod for node name %q hasn't got a %q annotation, cannot retrieve API endpoint", nodeName, constants.KubeAPIServerAdvertiseAddressEndpointAnnotationKey))
}
// TODO: remove after 1.20, when the ClusterStatus struct is removed from the kubeadm-config ConfigMap.
func getAPIEndpointFromClusterStatus(client clientset.Interface, nodeName string, apiEndpoint *kubeadmapi.APIEndpoint) error {
clusterStatus, err := GetClusterStatus(client)
if err != nil {
return errors.Wrap(err, "could not retrieve cluster status")
}
if statusAPIEndpoint, ok := clusterStatus.APIEndpoints[nodeName]; ok {
*apiEndpoint = statusAPIEndpoint
return nil
}
return errors.Errorf("could not find node %s in the cluster status", nodeName)
}
// GetClusterStatus returns the kubeadm cluster status read from the kubeadm-config ConfigMap
func GetClusterStatus(client clientset.Interface) (*kubeadmapi.ClusterStatus, error) {
configMap, err := apiclient.GetConfigMapWithRetry(client, metav1.NamespaceSystem, constants.KubeadmConfigConfigMap)
if apierrors.IsNotFound(err) {
return &kubeadmapi.ClusterStatus{}, nil
}
if err != nil {
return nil, err
}
clusterStatus, err := UnmarshalClusterStatus(configMap.Data)
if err != nil {
return nil, err
}
return clusterStatus, nil
}
// UnmarshalClusterStatus takes raw ConfigMap.Data and converts it to a ClusterStatus object
func UnmarshalClusterStatus(data map[string]string) (*kubeadmapi.ClusterStatus, error) {
clusterStatusData, ok := data[constants.ClusterStatusConfigMapKey]
if !ok {
return nil, errors.Errorf("unexpected error when reading kubeadm-config ConfigMap: %s key value pair missing", constants.ClusterStatusConfigMapKey)
}
clusterStatus := &kubeadmapi.ClusterStatus{}
if err := runtime.DecodeInto(kubeadmscheme.Codecs.UniversalDecoder(), []byte(clusterStatusData), clusterStatus); err != nil {
return nil, err
}
return clusterStatus, nil
return "", errors.Errorf("API server pod for node name %q hasn't got a %q annotation, cannot retrieve API endpoint", nodeName, constants.KubeAPIServerAdvertiseAddressEndpointAnnotationKey)
}

View File

@ -364,17 +364,16 @@ func TestGetAPIEndpointWithBackoff(t *testing.T) {
name string
nodeName string
staticPod *testresources.FakeStaticPod
configMap *testresources.FakeConfigMap
expectedEndpoint *kubeadmapi.APIEndpoint
expectedErr bool
}{
{
name: "no pod annotations; no ClusterStatus",
name: "no pod annotations",
nodeName: nodeName,
expectedErr: true,
},
{
name: "valid ipv4 endpoint in pod annotation; no ClusterStatus",
name: "valid ipv4 endpoint in pod annotation",
nodeName: nodeName,
staticPod: &testresources.FakeStaticPod{
Component: kubeadmconstants.KubeAPIServer,
@ -385,7 +384,7 @@ func TestGetAPIEndpointWithBackoff(t *testing.T) {
expectedEndpoint: &kubeadmapi.APIEndpoint{AdvertiseAddress: "1.2.3.4", BindPort: 1234},
},
{
name: "invalid ipv4 endpoint in pod annotation; no ClusterStatus",
name: "invalid ipv4 endpoint in pod annotation",
nodeName: nodeName,
staticPod: &testresources.FakeStaticPod{
Component: kubeadmconstants.KubeAPIServer,
@ -396,7 +395,7 @@ func TestGetAPIEndpointWithBackoff(t *testing.T) {
expectedErr: true,
},
{
name: "invalid negative port with ipv4 address in pod annotation; no ClusterStatus",
name: "invalid negative port with ipv4 address in pod annotation",
nodeName: nodeName,
staticPod: &testresources.FakeStaticPod{
Component: kubeadmconstants.KubeAPIServer,
@ -407,7 +406,7 @@ func TestGetAPIEndpointWithBackoff(t *testing.T) {
expectedErr: true,
},
{
name: "invalid high port with ipv4 address in pod annotation; no ClusterStatus",
name: "invalid high port with ipv4 address in pod annotation",
nodeName: nodeName,
staticPod: &testresources.FakeStaticPod{
Component: kubeadmconstants.KubeAPIServer,
@ -418,7 +417,7 @@ func TestGetAPIEndpointWithBackoff(t *testing.T) {
expectedErr: true,
},
{
name: "valid ipv6 endpoint in pod annotation; no ClusterStatus",
name: "valid ipv6 endpoint in pod annotation",
nodeName: nodeName,
staticPod: &testresources.FakeStaticPod{
Component: kubeadmconstants.KubeAPIServer,
@ -429,7 +428,7 @@ func TestGetAPIEndpointWithBackoff(t *testing.T) {
expectedEndpoint: &kubeadmapi.APIEndpoint{AdvertiseAddress: "::1", BindPort: 1234},
},
{
name: "invalid ipv6 endpoint in pod annotation; no ClusterStatus",
name: "invalid ipv6 endpoint in pod annotation",
nodeName: nodeName,
staticPod: &testresources.FakeStaticPod{
Component: kubeadmconstants.KubeAPIServer,
@ -440,7 +439,7 @@ func TestGetAPIEndpointWithBackoff(t *testing.T) {
expectedErr: true,
},
{
name: "invalid negative port with ipv6 address in pod annotation; no ClusterStatus",
name: "invalid negative port with ipv6 address in pod annotation",
nodeName: nodeName,
staticPod: &testresources.FakeStaticPod{
Component: kubeadmconstants.KubeAPIServer,
@ -461,24 +460,6 @@ func TestGetAPIEndpointWithBackoff(t *testing.T) {
},
expectedErr: true,
},
{
name: "no pod annotations; ClusterStatus with valid ipv4 endpoint",
nodeName: nodeName,
configMap: testresources.ClusterStatusWithAPIEndpoint(nodeName, kubeadmapi.APIEndpoint{AdvertiseAddress: "1.2.3.4", BindPort: 1234}),
expectedEndpoint: &kubeadmapi.APIEndpoint{AdvertiseAddress: "1.2.3.4", BindPort: 1234},
},
{
name: "invalid ipv4 endpoint in pod annotation; ClusterStatus with valid ipv4 endpoint",
nodeName: nodeName,
staticPod: &testresources.FakeStaticPod{
Component: kubeadmconstants.KubeAPIServer,
Annotations: map[string]string{
kubeadmconstants.KubeAPIServerAdvertiseAddressEndpointAnnotationKey: "1.2.3::1234",
},
},
configMap: testresources.ClusterStatusWithAPIEndpoint(nodeName, kubeadmapi.APIEndpoint{AdvertiseAddress: "1.2.3.4", BindPort: 1234}),
expectedEndpoint: &kubeadmapi.APIEndpoint{AdvertiseAddress: "1.2.3.4", BindPort: 1234},
},
}
for _, rt := range tests {
@ -491,12 +472,6 @@ func TestGetAPIEndpointWithBackoff(t *testing.T) {
return
}
}
if rt.configMap != nil {
if err := rt.configMap.Create(client); err != nil {
t.Error("could not create ConfigMap")
return
}
}
apiEndpoint := kubeadmapi.APIEndpoint{}
err := getAPIEndpointWithBackoff(client, rt.nodeName, &apiEndpoint, wait.Backoff{Duration: 0, Jitter: 0, Steps: 1})
if err != nil && !rt.expectedErr {
@ -545,7 +520,7 @@ func TestGetInitConfigurationFromCluster(t *testing.T) {
expectedError: true,
},
{
name: "valid v1beta2 - new control plane == false", // InitConfiguration composed with data from different places, with also node specific information from ClusterStatus and node
name: "valid v1beta2 - new control plane == false", // InitConfiguration composed with data from different places, with also node specific information
staticPods: []testresources.FakeStaticPod{
{
NodeName: nodeName,
@ -622,7 +597,7 @@ func TestGetInitConfigurationFromCluster(t *testing.T) {
newControlPlane: true,
},
{
name: "valid v1beta3 - new control plane == false", // InitConfiguration composed with data from different places, with also node specific information from ClusterStatus and node
name: "valid v1beta3 - new control plane == false", // InitConfiguration composed with data from different places, with also node specific information
staticPods: []testresources.FakeStaticPod{
{
NodeName: nodeName,
@ -767,85 +742,6 @@ func TestGetInitConfigurationFromCluster(t *testing.T) {
}
}
func TestGetGetClusterStatus(t *testing.T) {
var tests = []struct {
name string
configMaps []testresources.FakeConfigMap
expectedError bool
}{
{
name: "invalid missing config map",
},
{
name: "valid v1beta2",
configMaps: []testresources.FakeConfigMap{
{
Name: kubeadmconstants.KubeadmConfigConfigMap,
Data: map[string]string{
kubeadmconstants.ClusterStatusConfigMapKey: string(cfgFiles["ClusterStatus_v1beta2"]),
},
},
},
},
{
name: "valid v1beta3",
configMaps: []testresources.FakeConfigMap{
{
Name: kubeadmconstants.KubeadmConfigConfigMap,
Data: map[string]string{
kubeadmconstants.ClusterStatusConfigMapKey: string(cfgFiles["ClusterStatus_v1beta3"]),
},
},
},
},
{
name: "invalid missing ClusterStatusConfigMapKey in the config map",
configMaps: []testresources.FakeConfigMap{
{
Name: kubeadmconstants.KubeadmConfigConfigMap,
Data: map[string]string{},
},
},
expectedError: true,
},
{
name: "invalid wrong value in the config map",
configMaps: []testresources.FakeConfigMap{
{
Name: kubeadmconstants.KubeadmConfigConfigMap,
Data: map[string]string{
kubeadmconstants.ClusterStatusConfigMapKey: "not a kubeadm type",
},
},
},
expectedError: true,
},
}
for _, rt := range tests {
t.Run(rt.name, func(t *testing.T) {
client := clientsetfake.NewSimpleClientset()
for _, c := range rt.configMaps {
err := c.Create(client)
if err != nil {
t.Errorf("couldn't create ConfigMap %s", c.Name)
return
}
}
_, err := GetClusterStatus(client)
if rt.expectedError != (err != nil) {
t.Errorf("unexpected return err from GetClusterStatus: %v", err)
return
}
if rt.expectedError {
return
}
})
}
}
func TestGetAPIEndpointFromPodAnnotation(t *testing.T) {
var tests = []struct {
name string

View File

@ -37,7 +37,6 @@ import (
"k8s.io/klog/v2"
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/kubernetes/cmd/kubeadm/app/util/config"
)
const etcdTimeout = 2 * time.Second
@ -127,16 +126,7 @@ func getEtcdEndpoints(client clientset.Interface) ([]string, error) {
}
func getEtcdEndpointsWithBackoff(client clientset.Interface, backoff wait.Backoff) ([]string, error) {
etcdEndpoints, err := getRawEtcdEndpointsFromPodAnnotation(client, backoff)
if err != nil {
// NB: this is a fallback when there is no annotation found in the etcd pods that contains
// the client URL, and so we fallback to reading the ClusterStatus struct present in the
// kubeadm-config ConfigMap. This can happen for example, when performing the first
// `kubeadm upgrade apply`. This logic will be removed when the cluster status struct
// is removed from the kubeadm-config ConfigMap.
return getRawEtcdEndpointsFromClusterStatus(client)
}
return etcdEndpoints, nil
return getRawEtcdEndpointsFromPodAnnotation(client, backoff)
}
// getRawEtcdEndpointsFromPodAnnotation returns the list of endpoints as reported on etcd's pod annotations using the given backoff
@ -150,22 +140,19 @@ func getRawEtcdEndpointsFromPodAnnotation(client clientset.Interface, backoff wa
if etcdEndpoints, overallEtcdPodCount, lastErr = getRawEtcdEndpointsFromPodAnnotationWithoutRetry(client); lastErr != nil {
return false, nil
}
// TODO (ereslibre): this logic will need tweaking once that we get rid of the ClusterStatus, since we won't have
// the ClusterStatus safety net we will have to retry in both cases.
if len(etcdEndpoints) == 0 {
if overallEtcdPodCount == 0 {
return false, nil
}
// Fail fast scenario, to be removed once we get rid of the ClusterStatus
return true, errors.New("etcd Pods exist, but no etcd endpoint annotations were found")
if len(etcdEndpoints) == 0 || overallEtcdPodCount != len(etcdEndpoints) {
klog.V(4).Infof("found a total of %d etcd pods and the following endpoints: %v; retrying",
overallEtcdPodCount, etcdEndpoints)
return false, nil
}
return true, nil
})
if err != nil {
const message = "could not retrieve the list of etcd endpoints"
if lastErr != nil {
return []string{}, errors.Wrap(lastErr, "could not retrieve the list of etcd endpoints")
return []string{}, errors.Wrap(lastErr, message)
}
return []string{}, errors.Wrap(err, "could not retrieve the list of etcd endpoints")
return []string{}, errors.Wrap(err, message)
}
return etcdEndpoints, nil
}
@ -196,20 +183,6 @@ func getRawEtcdEndpointsFromPodAnnotationWithoutRetry(client clientset.Interface
return etcdEndpoints, len(podList.Items), nil
}
// TODO: remove after 1.20, when the ClusterStatus struct is removed from the kubeadm-config ConfigMap.
func getRawEtcdEndpointsFromClusterStatus(client clientset.Interface) ([]string, error) {
klog.V(3).Info("retrieving etcd endpoints from the cluster status")
clusterStatus, err := config.GetClusterStatus(client)
if err != nil {
return []string{}, err
}
etcdEndpoints := []string{}
for _, e := range clusterStatus.APIEndpoints {
etcdEndpoints = append(etcdEndpoints, GetClientURLByIP(e.AdvertiseAddress))
}
return etcdEndpoints, nil
}
// Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
func (c *Client) Sync() error {
// Syncs the list of endpoints

View File

@ -119,16 +119,16 @@ func TestGetEtcdEndpointsWithBackoff(t *testing.T) {
var tests = []struct {
name string
pods []testresources.FakeStaticPod
configMap *testresources.FakeConfigMap
expectedEndpoints []string
expectedErr bool
}{
{
name: "no pod annotations; no ClusterStatus",
name: "no pod annotations",
expectedEndpoints: []string{},
expectedErr: true,
},
{
name: "ipv4 endpoint in pod annotation; no ClusterStatus; port is preserved",
name: "ipv4 endpoint in pod annotation; port is preserved",
pods: []testresources.FakeStaticPod{
{
Component: constants.Etcd,
@ -139,11 +139,6 @@ func TestGetEtcdEndpointsWithBackoff(t *testing.T) {
},
expectedEndpoints: []string{"https://1.2.3.4:1234"},
},
{
name: "no pod annotations; ClusterStatus with valid ipv4 endpoint; port is inferred",
configMap: testresources.ClusterStatusWithAPIEndpoint("cp-0", kubeadmapi.APIEndpoint{AdvertiseAddress: "1.2.3.4", BindPort: 1234}),
expectedEndpoints: []string{"https://1.2.3.4:2379"},
},
}
for _, rt := range tests {
t.Run(rt.name, func(t *testing.T) {
@ -153,11 +148,6 @@ func TestGetEtcdEndpointsWithBackoff(t *testing.T) {
t.Errorf("error setting up test creating pod for node %q", pod.NodeName)
}
}
if rt.configMap != nil {
if err := rt.configMap.Create(client); err != nil {
t.Error("could not create ConfigMap")
}
}
endpoints, err := getEtcdEndpointsWithBackoff(client, wait.Backoff{Duration: 0, Jitter: 0, Steps: 1})
if err != nil && !rt.expectedErr {
t.Errorf("got error %q; was expecting no errors", err)
@ -195,6 +185,22 @@ func TestGetRawEtcdEndpointsFromPodAnnotation(t *testing.T) {
},
expectedEndpoints: []string{"https://1.2.3.4:2379"},
},
{
name: "two pods; one is missing annotation",
pods: []testresources.FakeStaticPod{
{
NodeName: "cp-0",
Component: constants.Etcd,
Annotations: map[string]string{constants.EtcdAdvertiseClientUrlsAnnotationKey: "https://1.2.3.4:2379"},
},
{
NodeName: "cp-1",
Component: constants.Etcd,
},
},
expectedEndpoints: []string{"https://1.2.3.4:2379"},
expectedErr: true,
},
{
name: "no pods with annotation",
expectedErr: true,
@ -267,6 +273,21 @@ func TestGetRawEtcdEndpointsFromPodAnnotationWithoutRetry(t *testing.T) {
},
expectedEndpoints: []string{"https://1.2.3.4:2379"},
},
{
name: "two pods; one is missing annotation",
pods: []testresources.FakeStaticPod{
{
NodeName: "cp-0",
Component: constants.Etcd,
Annotations: map[string]string{constants.EtcdAdvertiseClientUrlsAnnotationKey: "https://1.2.3.4:2379"},
},
{
NodeName: "cp-1",
Component: constants.Etcd,
},
},
expectedEndpoints: []string{"https://1.2.3.4:2379"},
},
{
name: "two pods with annotation",
pods: []testresources.FakeStaticPod{

View File

@ -1,41 +0,0 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resources
import (
"encoding/json"
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
)
// ClusterStatusWithAPIEndpoint returns a FakeConfigMap containing a
// cluster status with the provided endpoint for nodeName as a single
// entry
func ClusterStatusWithAPIEndpoint(nodeName string, endpoint kubeadmapi.APIEndpoint) *FakeConfigMap {
marshaledClusterStatus, _ := json.Marshal(kubeadmapi.ClusterStatus{
APIEndpoints: map[string]kubeadmapi.APIEndpoint{
nodeName: endpoint,
},
})
return &FakeConfigMap{
Name: constants.KubeadmConfigConfigMap,
Data: map[string]string{
constants.ClusterStatusConfigMapKey: string(marshaledClusterStatus),
},
}
}