add kube-apiserver-lease-controller poststart hook

This commit is contained in:
Haowei Cai 2020-10-26 13:17:00 -07:00
parent 742ba5f24a
commit 3761a00e5b
8 changed files with 150 additions and 10 deletions

View File

@ -79,6 +79,9 @@ type ServerRunOptions struct {
MasterCount int
EndpointReconcilerType string
IdentityLeaseDurationSeconds int
IdentityLeaseRenewIntervalSeconds int
ServiceAccountSigningKeyFile string
ServiceAccountIssuer serviceaccount.TokenGenerator
ServiceAccountTokenMaxExpiration time.Duration
@ -108,6 +111,8 @@ func NewServerRunOptions() *ServerRunOptions {
EventTTL: 1 * time.Hour,
MasterCount: 1,
EndpointReconcilerType: string(reconcilers.LeaseEndpointReconcilerType),
IdentityLeaseDurationSeconds: 3600,
IdentityLeaseRenewIntervalSeconds: 10,
KubeletConfig: kubeletclient.KubeletClientConfig{
Port: ports.KubeletPort,
ReadOnlyPort: ports.KubeletReadOnlyPort,
@ -186,6 +191,12 @@ func (s *ServerRunOptions) Flags() (fss cliflag.NamedFlagSets) {
fs.StringVar(&s.EndpointReconcilerType, "endpoint-reconciler-type", string(s.EndpointReconcilerType),
"Use an endpoint reconciler ("+strings.Join(reconcilers.AllTypes.Names(), ", ")+")")
fs.IntVar(&s.IdentityLeaseDurationSeconds, "identity-lease-duration-seconds", s.IdentityLeaseDurationSeconds,
"The duration of kube-apiserver lease in seconds, must be a positive number. (In use when the APIServerIdentity feature gate is enabled.)")
fs.IntVar(&s.IdentityLeaseRenewIntervalSeconds, "identity-lease-renew-interval-seconds", s.IdentityLeaseRenewIntervalSeconds,
"The interval of kube-apiserver renewing its lease in seconds, must be a positive number. (In use when the APIServerIdentity feature gate is enabled.)")
// See #14282 for details on how to test/try this option out.
// TODO: remove this comment once this option is tested in CI.
fs.IntVar(&s.KubernetesServiceNodePort, "kubernetes-service-node-port", s.KubernetesServiceNodePort, ""+

View File

@ -305,6 +305,8 @@ func TestAddFlags(t *testing.T) {
ProxyClientCertFile: "/var/run/kubernetes/proxy.crt",
Metrics: &metrics.Options{},
Logs: logs.NewOptions(),
IdentityLeaseDurationSeconds: 3600,
IdentityLeaseRenewIntervalSeconds: 10,
}
if !reflect.DeepEqual(expected, s) {

View File

@ -175,6 +175,12 @@ func (s *ServerRunOptions) Validate() []error {
errs = append(errs, validateTokenRequest(s)...)
errs = append(errs, s.Metrics.Validate()...)
errs = append(errs, s.Logs.Validate()...)
if s.IdentityLeaseDurationSeconds <= 0 {
errs = append(errs, fmt.Errorf("--identity-lease-duration-seconds should be a positive number, but value '%d' provided", s.IdentityLeaseDurationSeconds))
}
if s.IdentityLeaseRenewIntervalSeconds <= 0 {
errs = append(errs, fmt.Errorf("--identity-lease-renew-interval-seconds should be a positive number, but value '%d' provided", s.IdentityLeaseRenewIntervalSeconds))
}
return errs
}

View File

@ -364,6 +364,9 @@ func CreateKubeAPIServerConfig(
ExtendExpiration: s.Authentication.ServiceAccounts.ExtendExpiration,
VersionedInformers: versionedInformers,
IdentityLeaseDurationSeconds: s.IdentityLeaseDurationSeconds,
IdentityLeaseRenewIntervalSeconds: s.IdentityLeaseRenewIntervalSeconds,
},
}

View File

@ -64,9 +64,12 @@ import (
storageapiv1alpha1 "k8s.io/api/storage/v1alpha1"
storageapiv1beta1 "k8s.io/api/storage/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/clock"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/endpoints/discovery"
apiserverfeatures "k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/registry/generic"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
@ -78,6 +81,7 @@ import (
"k8s.io/client-go/kubernetes"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
discoveryclient "k8s.io/client-go/kubernetes/typed/discovery/v1beta1"
"k8s.io/component-helpers/lease"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/controlplane/controller/clusterauthenticationtrust"
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
@ -120,6 +124,12 @@ const (
DefaultEndpointReconcilerInterval = 10 * time.Second
// DefaultEndpointReconcilerTTL is the default TTL timeout for the storage layer
DefaultEndpointReconcilerTTL = 15 * time.Second
// IdentityLeaseComponentLabelKey is used to apply a component label to identity lease objects, indicating:
// 1. the lease is an identity lease (different from leader election leases)
// 2. which component owns this lease
IdentityLeaseComponentLabelKey = "k8s.io/component"
// KubeAPIServer defines variable used internally when referring to kube-apiserver component
KubeAPIServer = "kube-apiserver"
)
// ExtraConfig defines extra configuration for the master
@ -198,6 +208,9 @@ type ExtraConfig struct {
ServiceAccountPublicKeys []interface{}
VersionedInformers informers.SharedInformerFactory
IdentityLeaseDurationSeconds int
IdentityLeaseRenewIntervalSeconds int
}
// Config defines configuration for the master
@ -482,9 +495,38 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
return nil
})
if utilfeature.DefaultFeatureGate.Enabled(apiserverfeatures.APIServerIdentity) {
m.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-lease-controller", func(hookContext genericapiserver.PostStartHookContext) error {
kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig)
if err != nil {
return err
}
controller := lease.NewController(
clock.RealClock{},
kubeClient,
m.GenericAPIServer.APIServerID,
int32(c.ExtraConfig.IdentityLeaseDurationSeconds),
nil,
time.Duration(c.ExtraConfig.IdentityLeaseRenewIntervalSeconds)*time.Second,
metav1.NamespaceSystem,
labelAPIServerHeartbeat)
go controller.Run(wait.NeverStop)
return nil
})
}
return m, nil
}
func labelAPIServerHeartbeat(lease *coordinationapiv1.Lease) error {
if lease.Labels == nil {
lease.Labels = map[string]string{}
}
// This label indicates that kube-apiserver owns this identity lease object
lease.Labels[IdentityLeaseComponentLabelKey] = KubeAPIServer
return nil
}
// InstallLegacyAPI will install the legacy APIs for the restStorageProviders if they are enabled.
func (m *Instance) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) error {
legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)

View File

@ -223,6 +223,9 @@ type Config struct {
// EquivalentResourceRegistry provides information about resources equivalent to a given resource,
// and the kind associated with a given resource. As resources are installed, they are registered here.
EquivalentResourceRegistry runtime.EquivalentResourceRegistry
// APIServerID is the ID of this API server
APIServerID string
}
type RecommendedConfig struct {
@ -286,6 +289,10 @@ type AuthorizationInfo struct {
// NewConfig returns a Config struct with the default values
func NewConfig(codecs serializer.CodecFactory) *Config {
defaultHealthChecks := []healthz.HealthChecker{healthz.PingHealthz, healthz.LogHealthz}
var id string
if feature.DefaultFeatureGate.Enabled(features.APIServerIdentity) {
id = "kube-apiserver-" + uuid.New().String()
}
return &Config{
Serializer: codecs,
BuildHandlerChainFunc: DefaultBuildHandlerChain,
@ -324,6 +331,7 @@ func NewConfig(codecs serializer.CodecFactory) *Config {
// Default to treating watch as a long-running operation
// Generic API servers have no inherent long-running subresources
LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()),
APIServerID: id,
}
}
@ -573,6 +581,7 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
maxRequestBodyBytes: c.MaxRequestBodyBytes,
livezClock: clock.RealClock{},
APIServerID: c.APIServerID,
}
for {

View File

@ -197,6 +197,9 @@ type GenericAPIServer struct {
// The limit on the request body size that would be accepted and decoded in a write request.
// 0 means no limit.
maxRequestBodyBytes int64
// APIServerID is the ID of this API server
APIServerID string
}
// DelegationTarget is an interface which allows for composition of API servers with top level handling that works

View File

@ -0,0 +1,64 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package master
import (
"context"
"strings"
"testing"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"
featuregatetesting "k8s.io/component-base/featuregate/testing"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/pkg/controlplane"
"k8s.io/kubernetes/test/integration/framework"
)
const apiserverIdentityLeaseLabelSelector = controlplane.IdentityLeaseComponentLabelKey + "=" + controlplane.KubeAPIServer
func TestCreateLeaseOnStart(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true)()
result := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
defer result.TearDownFn()
kubeclient, err := kubernetes.NewForConfig(result.ClientConfig)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
t.Logf(`Waiting the kube-apiserver Lease to be created`)
if err := wait.PollImmediate(500*time.Millisecond, 10*time.Second, func() (bool, error) {
leases, err := kubeclient.
CoordinationV1().
Leases(metav1.NamespaceSystem).
List(context.TODO(), metav1.ListOptions{LabelSelector: apiserverIdentityLeaseLabelSelector})
if err != nil {
return false, err
}
if leases != nil && len(leases.Items) == 1 && strings.HasPrefix(leases.Items[0].Name, "kube-apiserver-") {
return true, nil
}
return false, nil
}); err != nil {
t.Fatalf("Failed to see the kube-apiserver lease: %v", err)
}
}