diff --git a/cmd/kube-apiserver/app/options/options.go b/cmd/kube-apiserver/app/options/options.go index fc709597a75..68a714bfe58 100644 --- a/cmd/kube-apiserver/app/options/options.go +++ b/cmd/kube-apiserver/app/options/options.go @@ -84,6 +84,9 @@ type ServerRunOptions struct { MasterCount int EndpointReconcilerType string + IdentityLeaseDurationSeconds int + IdentityLeaseRenewIntervalSeconds int + ServiceAccountSigningKeyFile string ServiceAccountIssuer serviceaccount.TokenGenerator ServiceAccountTokenMaxExpiration time.Duration @@ -108,10 +111,12 @@ func NewServerRunOptions() *ServerRunOptions { Metrics: metrics.NewOptions(), Logs: logs.NewOptions(), - EnableLogsHandler: true, - EventTTL: 1 * time.Hour, - MasterCount: 1, - EndpointReconcilerType: string(reconcilers.LeaseEndpointReconcilerType), + EnableLogsHandler: true, + EventTTL: 1 * time.Hour, + MasterCount: 1, + EndpointReconcilerType: string(reconcilers.LeaseEndpointReconcilerType), + IdentityLeaseDurationSeconds: 3600, + IdentityLeaseRenewIntervalSeconds: 10, KubeletConfig: kubeletclient.KubeletClientConfig{ Port: ports.KubeletPort, ReadOnlyPort: ports.KubeletReadOnlyPort, @@ -209,6 +214,12 @@ func (s *ServerRunOptions) Flags() (fss cliflag.NamedFlagSets) { fs.StringVar(&s.EndpointReconcilerType, "endpoint-reconciler-type", string(s.EndpointReconcilerType), "Use an endpoint reconciler ("+strings.Join(reconcilers.AllTypes.Names(), ", ")+")") + fs.IntVar(&s.IdentityLeaseDurationSeconds, "identity-lease-duration-seconds", s.IdentityLeaseDurationSeconds, + "The duration of kube-apiserver lease in seconds, must be a positive number. (In use when the APIServerIdentity feature gate is enabled.)") + + fs.IntVar(&s.IdentityLeaseRenewIntervalSeconds, "identity-lease-renew-interval-seconds", s.IdentityLeaseRenewIntervalSeconds, + "The interval of kube-apiserver renewing its lease in seconds, must be a positive number. (In use when the APIServerIdentity feature gate is enabled.)") + // See #14282 for details on how to test/try this option out. // TODO: remove this comment once this option is tested in CI. fs.IntVar(&s.KubernetesServiceNodePort, "kubernetes-service-node-port", s.KubernetesServiceNodePort, ""+ diff --git a/cmd/kube-apiserver/app/options/options_test.go b/cmd/kube-apiserver/app/options/options_test.go index f16c59ed8ee..8ad927bbe8f 100644 --- a/cmd/kube-apiserver/app/options/options_test.go +++ b/cmd/kube-apiserver/app/options/options_test.go @@ -299,12 +299,14 @@ func TestAddFlags(t *testing.T) { EgressSelector: &apiserveroptions.EgressSelectorOptions{ ConfigFile: "/var/run/kubernetes/egress-selector/connectivity.yaml", }, - EnableLogsHandler: false, - EnableAggregatorRouting: true, - ProxyClientKeyFile: "/var/run/kubernetes/proxy.key", - ProxyClientCertFile: "/var/run/kubernetes/proxy.crt", - Metrics: &metrics.Options{}, - Logs: logs.NewOptions(), + EnableLogsHandler: false, + EnableAggregatorRouting: true, + ProxyClientKeyFile: "/var/run/kubernetes/proxy.key", + ProxyClientCertFile: "/var/run/kubernetes/proxy.crt", + Metrics: &metrics.Options{}, + Logs: logs.NewOptions(), + IdentityLeaseDurationSeconds: 3600, + IdentityLeaseRenewIntervalSeconds: 10, } if !reflect.DeepEqual(expected, s) { diff --git a/cmd/kube-apiserver/app/options/validation.go b/cmd/kube-apiserver/app/options/validation.go index 396a40ad3be..7de622ceb16 100644 --- a/cmd/kube-apiserver/app/options/validation.go +++ b/cmd/kube-apiserver/app/options/validation.go @@ -169,6 +169,12 @@ func (s *ServerRunOptions) Validate() []error { errs = append(errs, validateTokenRequest(s)...) errs = append(errs, s.Metrics.Validate()...) errs = append(errs, s.Logs.Validate()...) + if s.IdentityLeaseDurationSeconds <= 0 { + errs = append(errs, fmt.Errorf("--identity-lease-duration-seconds should be a positive number, but value '%d' provided", s.IdentityLeaseDurationSeconds)) + } + if s.IdentityLeaseRenewIntervalSeconds <= 0 { + errs = append(errs, fmt.Errorf("--identity-lease-renew-interval-seconds should be a positive number, but value '%d' provided", s.IdentityLeaseRenewIntervalSeconds)) + } return errs } diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index be368e177af..fcb7ff957ce 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -375,6 +375,9 @@ func CreateKubeAPIServerConfig( ExtendExpiration: s.Authentication.ServiceAccounts.ExtendExpiration, VersionedInformers: versionedInformers, + + IdentityLeaseDurationSeconds: s.IdentityLeaseDurationSeconds, + IdentityLeaseRenewIntervalSeconds: s.IdentityLeaseRenewIntervalSeconds, }, } diff --git a/pkg/controlplane/BUILD b/pkg/controlplane/BUILD index 108f7c8b1c5..b9c2d0637de 100644 --- a/pkg/controlplane/BUILD +++ b/pkg/controlplane/BUILD @@ -109,11 +109,13 @@ go_library( "//staging/src/k8s.io/api/storage/v1beta1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/endpoints/discovery:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/features:go_default_library", "//staging/src/k8s.io/apiserver/pkg/registry/generic:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates:go_default_library", @@ -126,6 +128,7 @@ go_library( "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/discovery/v1beta1:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", + "//staging/src/k8s.io/component-helpers/lease:go_default_library", "//vendor/k8s.io/klog/v2:go_default_library", "//vendor/k8s.io/utils/integer:go_default_library", "//vendor/k8s.io/utils/net:go_default_library", diff --git a/pkg/controlplane/instance.go b/pkg/controlplane/instance.go index eedc5b3a924..744185f9901 100644 --- a/pkg/controlplane/instance.go +++ b/pkg/controlplane/instance.go @@ -64,9 +64,12 @@ import ( storageapiv1alpha1 "k8s.io/api/storage/v1alpha1" storageapiv1beta1 "k8s.io/api/storage/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/clock" utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/endpoints/discovery" + apiserverfeatures "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/registry/generic" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/dynamiccertificates" @@ -78,6 +81,7 @@ import ( "k8s.io/client-go/kubernetes" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" discoveryclient "k8s.io/client-go/kubernetes/typed/discovery/v1beta1" + "k8s.io/component-helpers/lease" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/controlplane/controller/clusterauthenticationtrust" "k8s.io/kubernetes/pkg/controlplane/reconcilers" @@ -120,6 +124,12 @@ const ( DefaultEndpointReconcilerInterval = 10 * time.Second // DefaultEndpointReconcilerTTL is the default TTL timeout for the storage layer DefaultEndpointReconcilerTTL = 15 * time.Second + // IdentityLeaseComponentLabelKey is used to apply a component label to identity lease objects, indicating: + // 1. the lease is an identity lease (different from leader election leases) + // 2. which component owns this lease + IdentityLeaseComponentLabelKey = "k8s.io/component" + // KubeAPIServer defines variable used internally when referring to kube-apiserver component + KubeAPIServer = "kube-apiserver" ) // ExtraConfig defines extra configuration for the master @@ -198,6 +208,9 @@ type ExtraConfig struct { ServiceAccountPublicKeys []interface{} VersionedInformers informers.SharedInformerFactory + + IdentityLeaseDurationSeconds int + IdentityLeaseRenewIntervalSeconds int } // Config defines configuration for the master @@ -482,9 +495,38 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) return nil }) + if utilfeature.DefaultFeatureGate.Enabled(apiserverfeatures.APIServerIdentity) { + m.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-lease-controller", func(hookContext genericapiserver.PostStartHookContext) error { + kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig) + if err != nil { + return err + } + controller := lease.NewController( + clock.RealClock{}, + kubeClient, + m.GenericAPIServer.APIServerID, + int32(c.ExtraConfig.IdentityLeaseDurationSeconds), + nil, + time.Duration(c.ExtraConfig.IdentityLeaseRenewIntervalSeconds)*time.Second, + metav1.NamespaceSystem, + labelAPIServerHeartbeat) + go controller.Run(wait.NeverStop) + return nil + }) + } + return m, nil } +func labelAPIServerHeartbeat(lease *coordinationapiv1.Lease) error { + if lease.Labels == nil { + lease.Labels = map[string]string{} + } + // This label indicates that kube-apiserver owns this identity lease object + lease.Labels[IdentityLeaseComponentLabelKey] = KubeAPIServer + return nil +} + // InstallLegacyAPI will install the legacy APIs for the restStorageProviders if they are enabled. func (m *Instance) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) error { legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter) diff --git a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go index c25f2695286..a45bfa83c37 100644 --- a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go +++ b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go @@ -148,6 +148,12 @@ const ( // // Allows for updating watchcache resource version with progress notify events. EfficientWatchResumption featuregate.Feature = "EfficientWatchResumption" + + // owner: @roycaihw + // alpha: v1.20 + // + // Assigns each kube-apiserver an ID in a cluster. + APIServerIdentity featuregate.Feature = "APIServerIdentity" ) func init() { @@ -173,4 +179,5 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS SelectorIndex: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, WarningHeaders: {Default: true, PreRelease: featuregate.Beta}, EfficientWatchResumption: {Default: false, PreRelease: featuregate.Alpha}, + APIServerIdentity: {Default: false, PreRelease: featuregate.Alpha}, } diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index e3e8afcdcaa..80be10c01fd 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -224,6 +224,9 @@ type Config struct { // EquivalentResourceRegistry provides information about resources equivalent to a given resource, // and the kind associated with a given resource. As resources are installed, they are registered here. EquivalentResourceRegistry runtime.EquivalentResourceRegistry + + // APIServerID is the ID of this API server + APIServerID string } type RecommendedConfig struct { @@ -287,6 +290,10 @@ type AuthorizationInfo struct { // NewConfig returns a Config struct with the default values func NewConfig(codecs serializer.CodecFactory) *Config { defaultHealthChecks := []healthz.HealthChecker{healthz.PingHealthz, healthz.LogHealthz} + var id string + if feature.DefaultFeatureGate.Enabled(features.APIServerIdentity) { + id = "kube-apiserver-" + uuid.New().String() + } return &Config{ Serializer: codecs, BuildHandlerChainFunc: DefaultBuildHandlerChain, @@ -325,6 +332,7 @@ func NewConfig(codecs serializer.CodecFactory) *Config { // Default to treating watch as a long-running operation // Generic API servers have no inherent long-running subresources LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()), + APIServerID: id, } } @@ -574,6 +582,7 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G maxRequestBodyBytes: c.MaxRequestBodyBytes, livezClock: clock.RealClock{}, + APIServerID: c.APIServerID, } for { diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index cc67b1f695c..07cef25bbea 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -196,6 +196,9 @@ type GenericAPIServer struct { // The limit on the request body size that would be accepted and decoded in a write request. // 0 means no limit. maxRequestBodyBytes int64 + + // APIServerID is the ID of this API server + APIServerID string } // DelegationTarget is an interface which allows for composition of API servers with top level handling that works diff --git a/test/integration/master/BUILD b/test/integration/master/BUILD index f6b1ad3b98b..de62dad8fd8 100644 --- a/test/integration/master/BUILD +++ b/test/integration/master/BUILD @@ -10,6 +10,7 @@ go_test( name = "go_default_test", size = "large", srcs = [ + "apiserver_identity_test.go", "audit_test.go", "crd_test.go", "graceful_shutdown_test.go", @@ -49,14 +50,17 @@ go_test( "//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authorization/authorizerfactory:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/features:go_default_library", "//staging/src/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/value:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/aes:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/plugin/pkg/authenticator/token/tokentest:go_default_library", "//staging/src/k8s.io/client-go/dynamic:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", + "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", "//staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library", "//test/integration:go_default_library", "//test/integration/etcd:go_default_library", diff --git a/test/integration/master/apiserver_identity_test.go b/test/integration/master/apiserver_identity_test.go new file mode 100644 index 00000000000..a36c4651a38 --- /dev/null +++ b/test/integration/master/apiserver_identity_test.go @@ -0,0 +1,64 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package master + +import ( + "context" + "strings" + "testing" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/features" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/kubernetes" + featuregatetesting "k8s.io/component-base/featuregate/testing" + kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" + "k8s.io/kubernetes/pkg/controlplane" + "k8s.io/kubernetes/test/integration/framework" +) + +const apiserverIdentityLeaseLabelSelector = controlplane.IdentityLeaseComponentLabelKey + "=" + controlplane.KubeAPIServer + +func TestCreateLeaseOnStart(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true)() + result := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd()) + defer result.TearDownFn() + + kubeclient, err := kubernetes.NewForConfig(result.ClientConfig) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + t.Logf(`Waiting the kube-apiserver Lease to be created`) + if err := wait.PollImmediate(500*time.Millisecond, 10*time.Second, func() (bool, error) { + leases, err := kubeclient. + CoordinationV1(). + Leases(metav1.NamespaceSystem). + List(context.TODO(), metav1.ListOptions{LabelSelector: apiserverIdentityLeaseLabelSelector}) + if err != nil { + return false, err + } + if leases != nil && len(leases.Items) == 1 && strings.HasPrefix(leases.Items[0].Name, "kube-apiserver-") { + return true, nil + } + return false, nil + }); err != nil { + t.Fatalf("Failed to see the kube-apiserver lease: %v", err) + } +}