Merge pull request #116170 from aojea/watch_instead_poll_system_namespaces

Watch instead poll system namespaces
This commit is contained in:
Kubernetes Prow Robot 2023-03-11 11:24:39 -08:00 committed by GitHub
commit cc3855e0cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 273 additions and 24 deletions

View File

@ -49,9 +49,8 @@ const (
)
// Controller is the controller manager for the core bootstrap Kubernetes
// controller loops, which manage creating the "kubernetes" service, the
// "default", "kube-system" and "kube-public" namespaces, and provide the IP
// repair check on service IPs
// controller loops, which manage creating the "kubernetes" service and
// provide the IP repair check on service IPs
type Controller struct {
client kubernetes.Interface
@ -69,9 +68,6 @@ type Controller struct {
EndpointReconciler reconcilers.EndpointReconciler
EndpointInterval time.Duration
SystemNamespaces []string
SystemNamespacesInterval time.Duration
PublicIP net.IP
// ServiceIP indicates where the kubernetes service will live. It may not be nil.
@ -101,17 +97,12 @@ func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.Lega
}
}
systemNamespaces := []string{metav1.NamespaceSystem, metav1.NamespacePublic, corev1.NamespaceNodeLease}
return &Controller{
client: client,
EndpointReconciler: c.ExtraConfig.EndpointReconcilerConfig.Reconciler,
EndpointInterval: c.ExtraConfig.EndpointReconcilerConfig.Interval,
SystemNamespaces: systemNamespaces,
SystemNamespacesInterval: 1 * time.Minute,
ServiceClusterIPRegistry: legacyRESTStorage.ServiceClusterIPAllocator,
ServiceClusterIPRange: c.ExtraConfig.ServiceIPRange,
SecondaryServiceClusterIPRegistry: legacyRESTStorage.SecondaryServiceClusterIPAllocator,
@ -181,7 +172,7 @@ func (c *Controller) Start() {
repairNodePorts.RunUntil(wg.Done, stopCh)
}
c.runner = async.NewRunner(c.RunKubernetesNamespaces, c.RunKubernetesService, runRepairClusterIPs, runRepairNodePorts)
c.runner = async.NewRunner(c.RunKubernetesService, runRepairClusterIPs, runRepairNodePorts)
c.runner.Start()
// For backward compatibility, we ensure that if we never are able
@ -225,18 +216,6 @@ func (c *Controller) Stop() {
}
}
// RunKubernetesNamespaces periodically makes sure that all internal namespaces exist
func (c *Controller) RunKubernetesNamespaces(ch chan struct{}) {
wait.Until(func() {
// Loop the system namespace list, and create them if they do not exist
for _, ns := range c.SystemNamespaces {
if err := createNamespaceIfNeeded(c.client.CoreV1(), ns); err != nil {
runtime.HandleError(fmt.Errorf("unable to create required kubernetes system namespace %s: %v", ns, err))
}
}
}, c.SystemNamespacesInterval, ch)
}
// RunKubernetesService periodically updates the kubernetes service
func (c *Controller) RunKubernetesService(ch chan struct{}) {
// wait until process is ready

View File

@ -0,0 +1,104 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package systemnamespaces
import (
"context"
"fmt"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
)
// Controller ensure system namespaces exist.
type Controller struct {
client kubernetes.Interface
namespaceLister listers.NamespaceLister
namespaceSynced cache.InformerSynced
systemNamespaces []string
interval time.Duration
}
// NewController creates a new Controller to ensure system namespaces exist.
func NewController(clientset kubernetes.Interface, namespaceInformer coreinformers.NamespaceInformer) *Controller {
systemNamespaces := []string{metav1.NamespaceSystem, metav1.NamespacePublic, v1.NamespaceNodeLease, metav1.NamespaceDefault}
interval := 1 * time.Minute
return &Controller{
client: clientset,
namespaceLister: namespaceInformer.Lister(),
namespaceSynced: namespaceInformer.Informer().HasSynced,
systemNamespaces: systemNamespaces,
interval: interval,
}
}
// Run starts one worker.
func (c *Controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer klog.Infof("Shutting down system namespaces controller")
klog.Infof("Starting system namespaces controller")
if !cache.WaitForCacheSync(stopCh, c.namespaceSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
return
}
go wait.Until(c.sync, c.interval, stopCh)
<-stopCh
}
func (c *Controller) sync() {
// Loop the system namespace list, and create them if they do not exist
for _, ns := range c.systemNamespaces {
if err := c.createNamespaceIfNeeded(ns); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to create required kubernetes system Namespace %s: %v", ns, err))
}
}
}
func (c *Controller) createNamespaceIfNeeded(ns string) error {
if _, err := c.namespaceLister.Get(ns); err == nil {
// the namespace already exists
return nil
}
newNs := &v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: ns,
Namespace: "",
},
}
_, err := c.client.CoreV1().Namespaces().Create(context.TODO(), newNs, metav1.CreateOptions{})
if err != nil && errors.IsAlreadyExists(err) {
err = nil
}
return err
}

View File

@ -0,0 +1,155 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package systemnamespaces
import (
"testing"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
k8stesting "k8s.io/client-go/testing"
"k8s.io/gengo/examples/set-gen/sets"
)
// Test_Controller validates the garbage collection logic for the apiserverleasegc controller.
func Test_Controller(t *testing.T) {
systemNamespaces := []string{metav1.NamespaceSystem, metav1.NamespacePublic, v1.NamespaceNodeLease}
tests := []struct {
name string
namespaces []string
actions [][]string // verb and resource
}{
{
name: "no system namespaces",
actions: [][]string{
{"create", "namespaces"},
{"create", "namespaces"},
{"create", "namespaces"},
{"create", "namespaces"},
},
},
{
name: "no system namespaces but others",
namespaces: []string{"foo", "bar"},
actions: [][]string{
{"create", "namespaces"},
{"create", "namespaces"},
{"create", "namespaces"},
{"create", "namespaces"},
},
},
{
name: "one system namespace",
namespaces: []string{metav1.NamespaceSystem},
actions: [][]string{
{"create", "namespaces"},
{"create", "namespaces"},
{"create", "namespaces"},
},
},
{
name: "two system namespaces",
namespaces: []string{metav1.NamespaceSystem, metav1.NamespacePublic},
actions: [][]string{
{"create", "namespaces"},
{"create", "namespaces"},
},
},
{
name: "three namespaces",
namespaces: []string{metav1.NamespaceSystem, metav1.NamespacePublic, v1.NamespaceNodeLease},
actions: [][]string{
{"create", "namespaces"},
},
},
{
name: "the four namespaces",
namespaces: []string{metav1.NamespaceSystem, metav1.NamespacePublic, v1.NamespaceNodeLease, v1.NamespaceDefault},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
objs := []runtime.Object{}
for _, ns := range test.namespaces {
objs = append(objs,
&v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: ns,
Namespace: "",
},
},
)
}
clientset := fake.NewSimpleClientset(objs...)
informerFactory := informers.NewSharedInformerFactory(clientset, 0)
namespaceInformer := informerFactory.Core().V1().Namespaces()
for _, obj := range objs {
namespaceInformer.Informer().GetIndexer().Add(obj)
}
controller := NewController(clientset, namespaceInformer)
clientset.PrependReactor("create", "namespaces", func(action k8stesting.Action) (bool, runtime.Object, error) {
create := action.(k8stesting.CreateAction)
namespaceInformer.Informer().GetIndexer().Add(create.GetObject())
return true, create.GetObject(), nil
})
controller.sync()
expectAction(t, clientset.Actions(), test.actions)
namespaces, err := controller.namespaceLister.List(labels.Everything())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
got := sets.NewString()
for _, ns := range namespaces {
got.Insert(ns.Name)
}
if !got.HasAll(systemNamespaces...) {
t.Errorf("unexpected namespaces: %v", got.List())
}
})
}
}
func expectAction(t *testing.T, actions []k8stesting.Action, expected [][]string) {
t.Helper()
if len(actions) != len(expected) {
t.Fatalf("Expected at least %d actions, got %d", len(expected), len(actions))
}
for i, action := range actions {
verb := expected[i][0]
if action.GetVerb() != verb {
t.Errorf("Expected action %d verb to be %s, got %s", i, verb, action.GetVerb())
}
resource := expected[i][1]
if action.GetResource().Resource != resource {
t.Errorf("Expected action %d resource to be %s, got %s", i, resource, action.GetResource().Resource)
}
}
}

View File

@ -85,6 +85,7 @@ import (
"k8s.io/kubernetes/pkg/controlplane/controller/apiserverleasegc"
"k8s.io/kubernetes/pkg/controlplane/controller/clusterauthenticationtrust"
"k8s.io/kubernetes/pkg/controlplane/controller/legacytokentracking"
"k8s.io/kubernetes/pkg/controlplane/controller/systemnamespaces"
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
kubeoptions "k8s.io/kubernetes/pkg/kubeapiserver/options"
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
@ -600,6 +601,13 @@ func (m *Instance) InstallLegacyAPI(c *completedConfig, restOptionsGetter generi
controllerName := "bootstrap-controller"
client := kubernetes.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
// Kubernetes clusters contains the following system namespaces:
// kube-system, kube-node-lease, kube-public, default
m.GenericAPIServer.AddPostStartHookOrDie("start-system-namespaces-controller", func(hookContext genericapiserver.PostStartHookContext) error {
go systemnamespaces.NewController(client, c.ExtraConfig.VersionedInformers.Core().V1().Namespaces()).Run(hookContext.StopCh)
return nil
})
bootstrapController, err := c.NewBootstrapController(legacyRESTStorage, client)
if err != nil {
return fmt.Errorf("error creating bootstrap controller: %v", err)

View File

@ -43,6 +43,7 @@ var (
"[+]poststarthook/start-apiextensions-controllers ok",
"[+]poststarthook/crd-informer-synced ok",
"[+]poststarthook/bootstrap-controller ok",
"[+]poststarthook/start-system-namespaces-controller ok",
"[+]poststarthook/scheduling/bootstrap-system-priority-classes ok",
"[+]poststarthook/start-cluster-authentication-info-controller ok",
"[+]poststarthook/start-kube-aggregator-informers ok",
@ -62,6 +63,7 @@ var (
"[+]poststarthook/start-apiextensions-controllers ok",
"[+]poststarthook/crd-informer-synced ok",
"[+]poststarthook/bootstrap-controller ok",
"[+]poststarthook/start-system-namespaces-controller ok",
"[+]poststarthook/scheduling/bootstrap-system-priority-classes ok",
"[+]poststarthook/start-cluster-authentication-info-controller ok",
"[+]poststarthook/start-kube-aggregator-informers ok",
@ -82,6 +84,7 @@ var (
"[+]poststarthook/start-apiextensions-controllers ok",
"[+]poststarthook/crd-informer-synced ok",
"[+]poststarthook/bootstrap-controller ok",
"[+]poststarthook/start-system-namespaces-controller ok",
"[+]poststarthook/scheduling/bootstrap-system-priority-classes ok",
"[+]poststarthook/start-cluster-authentication-info-controller ok",
"[+]poststarthook/start-kube-aggregator-informers ok",