mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-19 09:52:49 +00:00
decouple system namespaces from bootstrap controller
Use an informer instead of polling. Change-Id: Ib071e53addb914fcb31d8a1346cf61ca6d22520b
This commit is contained in:
parent
08fbe92fa7
commit
eecfaf658e
@ -49,9 +49,8 @@ const (
|
||||
)
|
||||
|
||||
// Controller is the controller manager for the core bootstrap Kubernetes
|
||||
// controller loops, which manage creating the "kubernetes" service, the
|
||||
// "default", "kube-system" and "kube-public" namespaces, and provide the IP
|
||||
// repair check on service IPs
|
||||
// controller loops, which manage creating the "kubernetes" service and
|
||||
// provide the IP repair check on service IPs
|
||||
type Controller struct {
|
||||
client kubernetes.Interface
|
||||
|
||||
@ -69,9 +68,6 @@ type Controller struct {
|
||||
EndpointReconciler reconcilers.EndpointReconciler
|
||||
EndpointInterval time.Duration
|
||||
|
||||
SystemNamespaces []string
|
||||
SystemNamespacesInterval time.Duration
|
||||
|
||||
PublicIP net.IP
|
||||
|
||||
// ServiceIP indicates where the kubernetes service will live. It may not be nil.
|
||||
@ -101,17 +97,12 @@ func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.Lega
|
||||
}
|
||||
}
|
||||
|
||||
systemNamespaces := []string{metav1.NamespaceSystem, metav1.NamespacePublic, corev1.NamespaceNodeLease}
|
||||
|
||||
return &Controller{
|
||||
client: client,
|
||||
|
||||
EndpointReconciler: c.ExtraConfig.EndpointReconcilerConfig.Reconciler,
|
||||
EndpointInterval: c.ExtraConfig.EndpointReconcilerConfig.Interval,
|
||||
|
||||
SystemNamespaces: systemNamespaces,
|
||||
SystemNamespacesInterval: 1 * time.Minute,
|
||||
|
||||
ServiceClusterIPRegistry: legacyRESTStorage.ServiceClusterIPAllocator,
|
||||
ServiceClusterIPRange: c.ExtraConfig.ServiceIPRange,
|
||||
SecondaryServiceClusterIPRegistry: legacyRESTStorage.SecondaryServiceClusterIPAllocator,
|
||||
@ -181,7 +172,7 @@ func (c *Controller) Start() {
|
||||
repairNodePorts.RunUntil(wg.Done, stopCh)
|
||||
}
|
||||
|
||||
c.runner = async.NewRunner(c.RunKubernetesNamespaces, c.RunKubernetesService, runRepairClusterIPs, runRepairNodePorts)
|
||||
c.runner = async.NewRunner(c.RunKubernetesService, runRepairClusterIPs, runRepairNodePorts)
|
||||
c.runner.Start()
|
||||
|
||||
// For backward compatibility, we ensure that if we never are able
|
||||
@ -225,18 +216,6 @@ func (c *Controller) Stop() {
|
||||
}
|
||||
}
|
||||
|
||||
// RunKubernetesNamespaces periodically makes sure that all internal namespaces exist
|
||||
func (c *Controller) RunKubernetesNamespaces(ch chan struct{}) {
|
||||
wait.Until(func() {
|
||||
// Loop the system namespace list, and create them if they do not exist
|
||||
for _, ns := range c.SystemNamespaces {
|
||||
if err := createNamespaceIfNeeded(c.client.CoreV1(), ns); err != nil {
|
||||
runtime.HandleError(fmt.Errorf("unable to create required kubernetes system namespace %s: %v", ns, err))
|
||||
}
|
||||
}
|
||||
}, c.SystemNamespacesInterval, ch)
|
||||
}
|
||||
|
||||
// RunKubernetesService periodically updates the kubernetes service
|
||||
func (c *Controller) RunKubernetesService(ch chan struct{}) {
|
||||
// wait until process is ready
|
||||
|
@ -0,0 +1,104 @@
|
||||
/*
|
||||
Copyright 2023 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package systemnamespaces
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
listers "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
// Controller ensure system namespaces exist.
|
||||
type Controller struct {
|
||||
client kubernetes.Interface
|
||||
|
||||
namespaceLister listers.NamespaceLister
|
||||
namespaceSynced cache.InformerSynced
|
||||
|
||||
systemNamespaces []string
|
||||
interval time.Duration
|
||||
}
|
||||
|
||||
// NewController creates a new Controller to ensure system namespaces exist.
|
||||
func NewController(clientset kubernetes.Interface, namespaceInformer coreinformers.NamespaceInformer) *Controller {
|
||||
systemNamespaces := []string{metav1.NamespaceSystem, metav1.NamespacePublic, v1.NamespaceNodeLease}
|
||||
interval := 1 * time.Minute
|
||||
|
||||
return &Controller{
|
||||
client: clientset,
|
||||
namespaceLister: namespaceInformer.Lister(),
|
||||
namespaceSynced: namespaceInformer.Informer().HasSynced,
|
||||
systemNamespaces: systemNamespaces,
|
||||
interval: interval,
|
||||
}
|
||||
}
|
||||
|
||||
// Run starts one worker.
|
||||
func (c *Controller) Run(stopCh <-chan struct{}) {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer klog.Infof("Shutting down system namespaces controller")
|
||||
|
||||
klog.Infof("Starting system namespaces controller")
|
||||
|
||||
if !cache.WaitForCacheSync(stopCh, c.namespaceSynced) {
|
||||
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
|
||||
return
|
||||
}
|
||||
|
||||
go wait.Until(c.sync, c.interval, stopCh)
|
||||
|
||||
<-stopCh
|
||||
}
|
||||
|
||||
func (c *Controller) sync() {
|
||||
// Loop the system namespace list, and create them if they do not exist
|
||||
for _, ns := range c.systemNamespaces {
|
||||
if err := c.createNamespaceIfNeeded(ns); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("unable to create required kubernetes system Namespace %s: %v", ns, err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Controller) createNamespaceIfNeeded(ns string) error {
|
||||
if _, err := c.namespaceLister.Get(ns); err == nil {
|
||||
// the namespace already exists
|
||||
return nil
|
||||
}
|
||||
newNs := &v1.Namespace{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: ns,
|
||||
Namespace: "",
|
||||
},
|
||||
}
|
||||
_, err := c.client.CoreV1().Namespaces().Create(context.TODO(), newNs, metav1.CreateOptions{})
|
||||
if err != nil && errors.IsAlreadyExists(err) {
|
||||
err = nil
|
||||
}
|
||||
return err
|
||||
}
|
@ -0,0 +1,144 @@
|
||||
/*
|
||||
Copyright 2023 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package systemnamespaces
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
k8stesting "k8s.io/client-go/testing"
|
||||
"k8s.io/gengo/examples/set-gen/sets"
|
||||
)
|
||||
|
||||
// Test_Controller validates the garbage collection logic for the apiserverleasegc controller.
|
||||
func Test_Controller(t *testing.T) {
|
||||
systemNamespaces := []string{metav1.NamespaceSystem, metav1.NamespacePublic, v1.NamespaceNodeLease}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
namespaces []string
|
||||
actions [][]string // verb and resource
|
||||
}{
|
||||
{
|
||||
name: "no system namespaces",
|
||||
actions: [][]string{
|
||||
{"create", "namespaces"},
|
||||
{"create", "namespaces"},
|
||||
{"create", "namespaces"},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "no system namespaces but others",
|
||||
namespaces: []string{"foo", "bar"},
|
||||
actions: [][]string{
|
||||
{"create", "namespaces"},
|
||||
{"create", "namespaces"},
|
||||
{"create", "namespaces"},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "one system namespace",
|
||||
namespaces: []string{metav1.NamespaceSystem},
|
||||
actions: [][]string{
|
||||
{"create", "namespaces"},
|
||||
{"create", "namespaces"},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "two system namespaces",
|
||||
namespaces: []string{metav1.NamespaceSystem, metav1.NamespacePublic},
|
||||
actions: [][]string{
|
||||
{"create", "namespaces"},
|
||||
},
|
||||
},
|
||||
|
||||
{
|
||||
name: "the three namespaces",
|
||||
namespaces: []string{metav1.NamespaceSystem, metav1.NamespacePublic, v1.NamespaceNodeLease},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
objs := []runtime.Object{}
|
||||
for _, ns := range test.namespaces {
|
||||
objs = append(objs,
|
||||
&v1.Namespace{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: ns,
|
||||
Namespace: "",
|
||||
},
|
||||
},
|
||||
)
|
||||
}
|
||||
clientset := fake.NewSimpleClientset(objs...)
|
||||
informerFactory := informers.NewSharedInformerFactory(clientset, 0)
|
||||
namespaceInformer := informerFactory.Core().V1().Namespaces()
|
||||
for _, obj := range objs {
|
||||
namespaceInformer.Informer().GetIndexer().Add(obj)
|
||||
}
|
||||
|
||||
controller := NewController(clientset, namespaceInformer)
|
||||
|
||||
clientset.PrependReactor("create", "namespaces", func(action k8stesting.Action) (bool, runtime.Object, error) {
|
||||
create := action.(k8stesting.CreateAction)
|
||||
namespaceInformer.Informer().GetIndexer().Add(create.GetObject())
|
||||
return true, create.GetObject(), nil
|
||||
})
|
||||
|
||||
controller.sync()
|
||||
|
||||
expectAction(t, clientset.Actions(), test.actions)
|
||||
namespaces, err := controller.namespaceLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
got := sets.NewString()
|
||||
for _, ns := range namespaces {
|
||||
got.Insert(ns.Name)
|
||||
}
|
||||
|
||||
if !got.HasAll(systemNamespaces...) {
|
||||
t.Errorf("unexpected namespaces: %v", got.List())
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func expectAction(t *testing.T, actions []k8stesting.Action, expected [][]string) {
|
||||
t.Helper()
|
||||
if len(actions) != len(expected) {
|
||||
t.Fatalf("Expected at least %d actions, got %d", len(expected), len(actions))
|
||||
}
|
||||
|
||||
for i, action := range actions {
|
||||
verb := expected[i][0]
|
||||
if action.GetVerb() != verb {
|
||||
t.Errorf("Expected action %d verb to be %s, got %s", i, verb, action.GetVerb())
|
||||
}
|
||||
resource := expected[i][1]
|
||||
if action.GetResource().Resource != resource {
|
||||
t.Errorf("Expected action %d resource to be %s, got %s", i, resource, action.GetResource().Resource)
|
||||
}
|
||||
}
|
||||
}
|
@ -85,6 +85,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/controlplane/controller/apiserverleasegc"
|
||||
"k8s.io/kubernetes/pkg/controlplane/controller/clusterauthenticationtrust"
|
||||
"k8s.io/kubernetes/pkg/controlplane/controller/legacytokentracking"
|
||||
"k8s.io/kubernetes/pkg/controlplane/controller/systemnamespaces"
|
||||
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
|
||||
kubeoptions "k8s.io/kubernetes/pkg/kubeapiserver/options"
|
||||
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
|
||||
@ -600,6 +601,13 @@ func (m *Instance) InstallLegacyAPI(c *completedConfig, restOptionsGetter generi
|
||||
|
||||
controllerName := "bootstrap-controller"
|
||||
client := kubernetes.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
|
||||
// Kubernetes clusters contains the following system namespaces:
|
||||
// kube-system, kube-node-lease, kube-public
|
||||
m.GenericAPIServer.AddPostStartHookOrDie("start-system-namespaces-controller", func(hookContext genericapiserver.PostStartHookContext) error {
|
||||
go systemnamespaces.NewController(client, c.ExtraConfig.VersionedInformers.Core().V1().Namespaces()).Run(hookContext.StopCh)
|
||||
return nil
|
||||
})
|
||||
|
||||
bootstrapController, err := c.NewBootstrapController(legacyRESTStorage, client)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating bootstrap controller: %v", err)
|
||||
|
@ -43,6 +43,7 @@ var (
|
||||
"[+]poststarthook/start-apiextensions-controllers ok",
|
||||
"[+]poststarthook/crd-informer-synced ok",
|
||||
"[+]poststarthook/bootstrap-controller ok",
|
||||
"[+]poststarthook/start-system-namespaces-controller ok",
|
||||
"[+]poststarthook/scheduling/bootstrap-system-priority-classes ok",
|
||||
"[+]poststarthook/start-cluster-authentication-info-controller ok",
|
||||
"[+]poststarthook/start-kube-aggregator-informers ok",
|
||||
@ -62,6 +63,7 @@ var (
|
||||
"[+]poststarthook/start-apiextensions-controllers ok",
|
||||
"[+]poststarthook/crd-informer-synced ok",
|
||||
"[+]poststarthook/bootstrap-controller ok",
|
||||
"[+]poststarthook/start-system-namespaces-controller ok",
|
||||
"[+]poststarthook/scheduling/bootstrap-system-priority-classes ok",
|
||||
"[+]poststarthook/start-cluster-authentication-info-controller ok",
|
||||
"[+]poststarthook/start-kube-aggregator-informers ok",
|
||||
@ -82,6 +84,7 @@ var (
|
||||
"[+]poststarthook/start-apiextensions-controllers ok",
|
||||
"[+]poststarthook/crd-informer-synced ok",
|
||||
"[+]poststarthook/bootstrap-controller ok",
|
||||
"[+]poststarthook/start-system-namespaces-controller ok",
|
||||
"[+]poststarthook/scheduling/bootstrap-system-priority-classes ok",
|
||||
"[+]poststarthook/start-cluster-authentication-info-controller ok",
|
||||
"[+]poststarthook/start-kube-aggregator-informers ok",
|
||||
|
Loading…
Reference in New Issue
Block a user