Merge pull request #95533 from roycaihw/apiserver-lease-controller

Add kube-apiserver lease controller
This commit is contained in:
Kubernetes Prow Robot 2020-11-06 18:09:37 -08:00 committed by GitHub
commit 281866b35c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 164 additions and 10 deletions

View File

@ -84,6 +84,9 @@ type ServerRunOptions struct {
MasterCount int
EndpointReconcilerType string
IdentityLeaseDurationSeconds int
IdentityLeaseRenewIntervalSeconds int
ServiceAccountSigningKeyFile string
ServiceAccountIssuer serviceaccount.TokenGenerator
ServiceAccountTokenMaxExpiration time.Duration
@ -108,10 +111,12 @@ func NewServerRunOptions() *ServerRunOptions {
Metrics: metrics.NewOptions(),
Logs: logs.NewOptions(),
EnableLogsHandler: true,
EventTTL: 1 * time.Hour,
MasterCount: 1,
EndpointReconcilerType: string(reconcilers.LeaseEndpointReconcilerType),
EnableLogsHandler: true,
EventTTL: 1 * time.Hour,
MasterCount: 1,
EndpointReconcilerType: string(reconcilers.LeaseEndpointReconcilerType),
IdentityLeaseDurationSeconds: 3600,
IdentityLeaseRenewIntervalSeconds: 10,
KubeletConfig: kubeletclient.KubeletClientConfig{
Port: ports.KubeletPort,
ReadOnlyPort: ports.KubeletReadOnlyPort,
@ -209,6 +214,12 @@ func (s *ServerRunOptions) Flags() (fss cliflag.NamedFlagSets) {
fs.StringVar(&s.EndpointReconcilerType, "endpoint-reconciler-type", string(s.EndpointReconcilerType),
"Use an endpoint reconciler ("+strings.Join(reconcilers.AllTypes.Names(), ", ")+")")
fs.IntVar(&s.IdentityLeaseDurationSeconds, "identity-lease-duration-seconds", s.IdentityLeaseDurationSeconds,
"The duration of kube-apiserver lease in seconds, must be a positive number. (In use when the APIServerIdentity feature gate is enabled.)")
fs.IntVar(&s.IdentityLeaseRenewIntervalSeconds, "identity-lease-renew-interval-seconds", s.IdentityLeaseRenewIntervalSeconds,
"The interval of kube-apiserver renewing its lease in seconds, must be a positive number. (In use when the APIServerIdentity feature gate is enabled.)")
// See #14282 for details on how to test/try this option out.
// TODO: remove this comment once this option is tested in CI.
fs.IntVar(&s.KubernetesServiceNodePort, "kubernetes-service-node-port", s.KubernetesServiceNodePort, ""+

View File

@ -299,12 +299,14 @@ func TestAddFlags(t *testing.T) {
EgressSelector: &apiserveroptions.EgressSelectorOptions{
ConfigFile: "/var/run/kubernetes/egress-selector/connectivity.yaml",
},
EnableLogsHandler: false,
EnableAggregatorRouting: true,
ProxyClientKeyFile: "/var/run/kubernetes/proxy.key",
ProxyClientCertFile: "/var/run/kubernetes/proxy.crt",
Metrics: &metrics.Options{},
Logs: logs.NewOptions(),
EnableLogsHandler: false,
EnableAggregatorRouting: true,
ProxyClientKeyFile: "/var/run/kubernetes/proxy.key",
ProxyClientCertFile: "/var/run/kubernetes/proxy.crt",
Metrics: &metrics.Options{},
Logs: logs.NewOptions(),
IdentityLeaseDurationSeconds: 3600,
IdentityLeaseRenewIntervalSeconds: 10,
}
if !reflect.DeepEqual(expected, s) {

View File

@ -169,6 +169,12 @@ func (s *ServerRunOptions) Validate() []error {
errs = append(errs, validateTokenRequest(s)...)
errs = append(errs, s.Metrics.Validate()...)
errs = append(errs, s.Logs.Validate()...)
if s.IdentityLeaseDurationSeconds <= 0 {
errs = append(errs, fmt.Errorf("--identity-lease-duration-seconds should be a positive number, but value '%d' provided", s.IdentityLeaseDurationSeconds))
}
if s.IdentityLeaseRenewIntervalSeconds <= 0 {
errs = append(errs, fmt.Errorf("--identity-lease-renew-interval-seconds should be a positive number, but value '%d' provided", s.IdentityLeaseRenewIntervalSeconds))
}
return errs
}

View File

@ -375,6 +375,9 @@ func CreateKubeAPIServerConfig(
ExtendExpiration: s.Authentication.ServiceAccounts.ExtendExpiration,
VersionedInformers: versionedInformers,
IdentityLeaseDurationSeconds: s.IdentityLeaseDurationSeconds,
IdentityLeaseRenewIntervalSeconds: s.IdentityLeaseRenewIntervalSeconds,
},
}

View File

@ -109,11 +109,13 @@ go_library(
"//staging/src/k8s.io/api/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/discovery:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/registry/generic:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates:go_default_library",
@ -126,6 +128,7 @@ go_library(
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/discovery/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/component-helpers/lease:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
"//vendor/k8s.io/utils/integer:go_default_library",
"//vendor/k8s.io/utils/net:go_default_library",

View File

@ -64,9 +64,12 @@ import (
storageapiv1alpha1 "k8s.io/api/storage/v1alpha1"
storageapiv1beta1 "k8s.io/api/storage/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/clock"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/endpoints/discovery"
apiserverfeatures "k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/registry/generic"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
@ -78,6 +81,7 @@ import (
"k8s.io/client-go/kubernetes"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
discoveryclient "k8s.io/client-go/kubernetes/typed/discovery/v1beta1"
"k8s.io/component-helpers/lease"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/controlplane/controller/clusterauthenticationtrust"
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
@ -120,6 +124,12 @@ const (
DefaultEndpointReconcilerInterval = 10 * time.Second
// DefaultEndpointReconcilerTTL is the default TTL timeout for the storage layer
DefaultEndpointReconcilerTTL = 15 * time.Second
// IdentityLeaseComponentLabelKey is used to apply a component label to identity lease objects, indicating:
// 1. the lease is an identity lease (different from leader election leases)
// 2. which component owns this lease
IdentityLeaseComponentLabelKey = "k8s.io/component"
// KubeAPIServer defines variable used internally when referring to kube-apiserver component
KubeAPIServer = "kube-apiserver"
)
// ExtraConfig defines extra configuration for the master
@ -198,6 +208,9 @@ type ExtraConfig struct {
ServiceAccountPublicKeys []interface{}
VersionedInformers informers.SharedInformerFactory
IdentityLeaseDurationSeconds int
IdentityLeaseRenewIntervalSeconds int
}
// Config defines configuration for the master
@ -482,9 +495,38 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
return nil
})
if utilfeature.DefaultFeatureGate.Enabled(apiserverfeatures.APIServerIdentity) {
m.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-lease-controller", func(hookContext genericapiserver.PostStartHookContext) error {
kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig)
if err != nil {
return err
}
controller := lease.NewController(
clock.RealClock{},
kubeClient,
m.GenericAPIServer.APIServerID,
int32(c.ExtraConfig.IdentityLeaseDurationSeconds),
nil,
time.Duration(c.ExtraConfig.IdentityLeaseRenewIntervalSeconds)*time.Second,
metav1.NamespaceSystem,
labelAPIServerHeartbeat)
go controller.Run(wait.NeverStop)
return nil
})
}
return m, nil
}
func labelAPIServerHeartbeat(lease *coordinationapiv1.Lease) error {
if lease.Labels == nil {
lease.Labels = map[string]string{}
}
// This label indicates that kube-apiserver owns this identity lease object
lease.Labels[IdentityLeaseComponentLabelKey] = KubeAPIServer
return nil
}
// InstallLegacyAPI will install the legacy APIs for the restStorageProviders if they are enabled.
func (m *Instance) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) error {
legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)

View File

@ -148,6 +148,12 @@ const (
//
// Allows for updating watchcache resource version with progress notify events.
EfficientWatchResumption featuregate.Feature = "EfficientWatchResumption"
// owner: @roycaihw
// alpha: v1.20
//
// Assigns each kube-apiserver an ID in a cluster.
APIServerIdentity featuregate.Feature = "APIServerIdentity"
)
func init() {
@ -173,4 +179,5 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
SelectorIndex: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},
WarningHeaders: {Default: true, PreRelease: featuregate.Beta},
EfficientWatchResumption: {Default: false, PreRelease: featuregate.Alpha},
APIServerIdentity: {Default: false, PreRelease: featuregate.Alpha},
}

View File

@ -224,6 +224,9 @@ type Config struct {
// EquivalentResourceRegistry provides information about resources equivalent to a given resource,
// and the kind associated with a given resource. As resources are installed, they are registered here.
EquivalentResourceRegistry runtime.EquivalentResourceRegistry
// APIServerID is the ID of this API server
APIServerID string
}
type RecommendedConfig struct {
@ -287,6 +290,10 @@ type AuthorizationInfo struct {
// NewConfig returns a Config struct with the default values
func NewConfig(codecs serializer.CodecFactory) *Config {
defaultHealthChecks := []healthz.HealthChecker{healthz.PingHealthz, healthz.LogHealthz}
var id string
if feature.DefaultFeatureGate.Enabled(features.APIServerIdentity) {
id = "kube-apiserver-" + uuid.New().String()
}
return &Config{
Serializer: codecs,
BuildHandlerChainFunc: DefaultBuildHandlerChain,
@ -325,6 +332,7 @@ func NewConfig(codecs serializer.CodecFactory) *Config {
// Default to treating watch as a long-running operation
// Generic API servers have no inherent long-running subresources
LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()),
APIServerID: id,
}
}
@ -574,6 +582,7 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
maxRequestBodyBytes: c.MaxRequestBodyBytes,
livezClock: clock.RealClock{},
APIServerID: c.APIServerID,
}
for {

View File

@ -196,6 +196,9 @@ type GenericAPIServer struct {
// The limit on the request body size that would be accepted and decoded in a write request.
// 0 means no limit.
maxRequestBodyBytes int64
// APIServerID is the ID of this API server
APIServerID string
}
// DelegationTarget is an interface which allows for composition of API servers with top level handling that works

View File

@ -10,6 +10,7 @@ go_test(
name = "go_default_test",
size = "large",
srcs = [
"apiserver_identity_test.go",
"audit_test.go",
"crd_test.go",
"graceful_shutdown_test.go",
@ -49,14 +50,17 @@ go_test(
"//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authorization/authorizerfactory:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/value:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/aes:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/apiserver/plugin/pkg/authenticator/token/tokentest:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
"//staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library",
"//test/integration:go_default_library",
"//test/integration/etcd:go_default_library",

View File

@ -0,0 +1,64 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package master
import (
"context"
"strings"
"testing"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"
featuregatetesting "k8s.io/component-base/featuregate/testing"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/pkg/controlplane"
"k8s.io/kubernetes/test/integration/framework"
)
const apiserverIdentityLeaseLabelSelector = controlplane.IdentityLeaseComponentLabelKey + "=" + controlplane.KubeAPIServer
func TestCreateLeaseOnStart(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true)()
result := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
defer result.TearDownFn()
kubeclient, err := kubernetes.NewForConfig(result.ClientConfig)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
t.Logf(`Waiting the kube-apiserver Lease to be created`)
if err := wait.PollImmediate(500*time.Millisecond, 10*time.Second, func() (bool, error) {
leases, err := kubeclient.
CoordinationV1().
Leases(metav1.NamespaceSystem).
List(context.TODO(), metav1.ListOptions{LabelSelector: apiserverIdentityLeaseLabelSelector})
if err != nil {
return false, err
}
if leases != nil && len(leases.Items) == 1 && strings.HasPrefix(leases.Items[0].Name, "kube-apiserver-") {
return true, nil
}
return false, nil
}); err != nil {
t.Fatalf("Failed to see the kube-apiserver lease: %v", err)
}
}