mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 22:46:12 +00:00
Merge pull request #46147 from nicksardo/gce-cluster-id
Automatic merge from submit-queue (batch tested with PRs 45891, 46147) Watching ClusterId from within GCE cloud provider **What this PR does / why we need it**: Adds the ability for the GCE cloud provider to watch a config map for `clusterId` and `providerId`. WIP - still needs more testing cc @MrHohn @csbell @madhusudancs @thockin @bowei @nikhiljindal **Release note**: ```release-note NONE ```
This commit is contained in:
commit
c1c7365e7c
@ -28,7 +28,8 @@ import (
|
||||
|
||||
// Interface is an abstract, pluggable interface for cloud providers.
|
||||
type Interface interface {
|
||||
// Initialize provides the cloud with a kubernetes client builder
|
||||
// Initialize provides the cloud with a kubernetes client builder and may spawn goroutines
|
||||
// to perform housekeeping activities within the cloud provider.
|
||||
Initialize(clientBuilder controller.ControllerClientBuilder)
|
||||
// LoadBalancer returns a balancer interface. Also returns true if the interface is supported, false otherwise.
|
||||
LoadBalancer() (LoadBalancer, bool)
|
||||
|
@ -15,6 +15,7 @@ go_library(
|
||||
"gce.go",
|
||||
"gce_backendservice.go",
|
||||
"gce_cert.go",
|
||||
"gce_clusterid.go",
|
||||
"gce_clusters.go",
|
||||
"gce_disks.go",
|
||||
"gce_firewall.go",
|
||||
@ -37,6 +38,7 @@ go_library(
|
||||
deps = [
|
||||
"//pkg/api/v1:go_default_library",
|
||||
"//pkg/api/v1/service:go_default_library",
|
||||
"//pkg/client/clientset_generated/clientset:go_default_library",
|
||||
"//pkg/cloudprovider:go_default_library",
|
||||
"//pkg/controller:go_default_library",
|
||||
"//pkg/util/net/sets:go_default_library",
|
||||
@ -52,10 +54,14 @@ go_library(
|
||||
"//vendor/google.golang.org/api/googleapi:go_default_library",
|
||||
"//vendor/gopkg.in/gcfg.v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
|
||||
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
|
||||
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
|
||||
],
|
||||
)
|
||||
|
@ -79,6 +79,8 @@ type GCECloud struct {
|
||||
service *compute.Service
|
||||
serviceBeta *computebeta.Service
|
||||
containerService *container.Service
|
||||
clientBuilder controller.ControllerClientBuilder
|
||||
ClusterId ClusterId
|
||||
projectID string
|
||||
region string
|
||||
localZone string // The zone in which we are running
|
||||
@ -234,8 +236,12 @@ func CreateGCECloud(projectID, region, zone string, managedZones []string, netwo
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Initialize passes a Kubernetes clientBuilder interface to the cloud provider
|
||||
func (gce *GCECloud) Initialize(clientBuilder controller.ControllerClientBuilder) {}
|
||||
// Initialize takes in a clientBuilder and spawns a goroutine for watching the clusterid configmap.
|
||||
// This must be called before utilizing the funcs of gce.ClusterId
|
||||
func (gce *GCECloud) Initialize(clientBuilder controller.ControllerClientBuilder) {
|
||||
gce.clientBuilder = clientBuilder
|
||||
go gce.watchClusterId()
|
||||
}
|
||||
|
||||
// LoadBalancer returns an implementation of LoadBalancer for Google Compute Engine.
|
||||
func (gce *GCECloud) LoadBalancer() (cloudprovider.LoadBalancer, bool) {
|
||||
|
258
pkg/cloudprovider/providers/gce/gce_clusterid.go
Normal file
258
pkg/cloudprovider/providers/gce/gce_clusterid.go
Normal file
@ -0,0 +1,258 @@
|
||||
/*
|
||||
Copyright 2014 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package gce
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||
)
|
||||
|
||||
const (
|
||||
// Key used to persist UIDs to configmaps.
|
||||
UIDConfigMapName = "ingress-uid"
|
||||
// Namespace which contains the above config map
|
||||
UIDNamespace = metav1.NamespaceSystem
|
||||
// Data keys for the specific ids
|
||||
UIDCluster = "uid"
|
||||
UIDProvider = "provider-uid"
|
||||
UIDLengthBytes = 8
|
||||
// Frequency of the updateFunc event handler being called
|
||||
// This does not actually query the apiserver for current state - the local cache value is used.
|
||||
updateFuncFrequency = 10 * time.Minute
|
||||
)
|
||||
|
||||
type ClusterId struct {
|
||||
idLock sync.RWMutex
|
||||
client clientset.Interface
|
||||
cfgMapKey string
|
||||
store cache.Store
|
||||
providerId *string
|
||||
clusterId *string
|
||||
}
|
||||
|
||||
// Continually watches for changes to the cluser id config map
|
||||
func (gce *GCECloud) watchClusterId() {
|
||||
gce.ClusterId = ClusterId{
|
||||
cfgMapKey: fmt.Sprintf("%v/%v", UIDNamespace, UIDConfigMapName),
|
||||
client: gce.clientBuilder.ClientOrDie("cloud-provider"),
|
||||
}
|
||||
|
||||
mapEventHandler := cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
m, ok := obj.(*v1.ConfigMap)
|
||||
if !ok || m == nil {
|
||||
glog.Errorf("Expected v1.ConfigMap, item=%+v, typeIsOk=%v", obj, ok)
|
||||
return
|
||||
}
|
||||
if m.Namespace != UIDNamespace ||
|
||||
m.Name != UIDConfigMapName {
|
||||
return
|
||||
}
|
||||
|
||||
glog.V(4).Infof("Observed new configmap for clusterid: %v, %v; setting local values", m.Name, m.Data)
|
||||
gce.ClusterId.setIds(m)
|
||||
},
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
m, ok := cur.(*v1.ConfigMap)
|
||||
if !ok || m == nil {
|
||||
glog.Errorf("Expected v1.ConfigMap, item=%+v, typeIsOk=%v", cur, ok)
|
||||
return
|
||||
}
|
||||
|
||||
if m.Namespace != UIDNamespace ||
|
||||
m.Name != UIDConfigMapName {
|
||||
return
|
||||
}
|
||||
|
||||
if reflect.DeepEqual(old, cur) {
|
||||
return
|
||||
}
|
||||
|
||||
glog.V(4).Infof("Observed updated configmap for clusterid %v, %v; setting local values", m.Name, m.Data)
|
||||
gce.ClusterId.setIds(m)
|
||||
},
|
||||
}
|
||||
|
||||
listerWatcher := cache.NewListWatchFromClient(gce.ClusterId.client.Core().RESTClient(), "configmaps", UIDNamespace, fields.Everything())
|
||||
var controller cache.Controller
|
||||
gce.ClusterId.store, controller = cache.NewInformer(newSingleObjectListerWatcher(listerWatcher, UIDConfigMapName), &v1.ConfigMap{}, updateFuncFrequency, mapEventHandler)
|
||||
|
||||
controller.Run(nil)
|
||||
}
|
||||
|
||||
// GetId returns the id which is unique to this cluster
|
||||
// if federated, return the provider id (unique to the cluster)
|
||||
// if not federated, return the cluster id
|
||||
func (ci *ClusterId) GetId() (string, error) {
|
||||
if err := ci.getOrInitialize(); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
ci.idLock.RLock()
|
||||
defer ci.idLock.RUnlock()
|
||||
if ci.clusterId == nil {
|
||||
return "", errors.New("Could not retrieve cluster id")
|
||||
}
|
||||
|
||||
// If provider ID is set, (Federation is enabled) use this field
|
||||
if ci.providerId != nil && *ci.providerId != *ci.clusterId {
|
||||
return *ci.providerId, nil
|
||||
}
|
||||
|
||||
// providerId is not set, use the cluster id
|
||||
return *ci.clusterId, nil
|
||||
}
|
||||
|
||||
// GetFederationId returns the id which could represent the entire Federation
|
||||
// or just the cluster if not federated.
|
||||
func (ci *ClusterId) GetFederationId() (string, bool, error) {
|
||||
if err := ci.getOrInitialize(); err != nil {
|
||||
return "", false, err
|
||||
}
|
||||
|
||||
ci.idLock.RLock()
|
||||
defer ci.idLock.RUnlock()
|
||||
if ci.clusterId == nil {
|
||||
return "", false, errors.New("Could not retrieve cluster id")
|
||||
}
|
||||
|
||||
// If provider ID is not set, return false
|
||||
if ci.providerId == nil || *ci.clusterId == *ci.providerId {
|
||||
return "", false, nil
|
||||
}
|
||||
|
||||
return *ci.clusterId, true, nil
|
||||
}
|
||||
|
||||
// getOrInitialize either grabs the configmaps current value or defines the value
|
||||
// and sets the configmap. This is for the case of the user calling GetClusterId()
|
||||
// before the watch has begun.
|
||||
func (ci *ClusterId) getOrInitialize() error {
|
||||
if ci.store == nil {
|
||||
return errors.New("GCECloud.ClusterId is not ready. Call Initialize() before using.")
|
||||
}
|
||||
|
||||
if ci.clusterId != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
exists, err := ci.getConfigMap()
|
||||
if err != nil {
|
||||
return err
|
||||
} else if exists {
|
||||
return nil
|
||||
}
|
||||
|
||||
// The configmap does not exist - let's try creating one.
|
||||
newId, err := makeUID()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
glog.V(4).Infof("Creating clusterid: %v", newId)
|
||||
cfg := &v1.ConfigMap{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: UIDConfigMapName,
|
||||
Namespace: UIDNamespace,
|
||||
},
|
||||
}
|
||||
cfg.Data = map[string]string{
|
||||
UIDCluster: newId,
|
||||
UIDProvider: newId,
|
||||
}
|
||||
|
||||
if _, err := ci.client.Core().ConfigMaps(UIDNamespace).Create(cfg); err != nil {
|
||||
glog.Errorf("GCE cloud provider failed to create %v config map to store cluster id: %v", ci.cfgMapKey, err)
|
||||
return err
|
||||
}
|
||||
|
||||
glog.V(2).Infof("Created a config map containing clusterid: %v", newId)
|
||||
ci.setIds(cfg)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ci *ClusterId) getConfigMap() (bool, error) {
|
||||
item, exists, err := ci.store.GetByKey(ci.cfgMapKey)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if !exists {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
m, ok := item.(*v1.ConfigMap)
|
||||
if !ok || m == nil {
|
||||
err = fmt.Errorf("Expected v1.ConfigMap, item=%+v, typeIsOk=%v", item, ok)
|
||||
glog.Error(err)
|
||||
return false, err
|
||||
}
|
||||
ci.setIds(m)
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (ci *ClusterId) setIds(m *v1.ConfigMap) {
|
||||
ci.idLock.Lock()
|
||||
defer ci.idLock.Unlock()
|
||||
if clusterId, exists := m.Data[UIDCluster]; exists {
|
||||
ci.clusterId = &clusterId
|
||||
}
|
||||
if provId, exists := m.Data[UIDProvider]; exists {
|
||||
ci.providerId = &provId
|
||||
}
|
||||
}
|
||||
|
||||
func makeUID() (string, error) {
|
||||
b := make([]byte, UIDLengthBytes)
|
||||
_, err := rand.Read(b)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return hex.EncodeToString(b), nil
|
||||
}
|
||||
|
||||
func newSingleObjectListerWatcher(lw cache.ListerWatcher, objectName string) *singleObjListerWatcher {
|
||||
return &singleObjListerWatcher{lw: lw, objectName: objectName}
|
||||
}
|
||||
|
||||
type singleObjListerWatcher struct {
|
||||
lw cache.ListerWatcher
|
||||
objectName string
|
||||
}
|
||||
|
||||
func (sow *singleObjListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) {
|
||||
options.FieldSelector = "metadata.name=" + sow.objectName
|
||||
return sow.lw.List(options)
|
||||
}
|
||||
|
||||
func (sow *singleObjListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
|
||||
options.FieldSelector = "metadata.name=" + sow.objectName
|
||||
return sow.lw.Watch(options)
|
||||
}
|
@ -85,6 +85,13 @@ func init() {
|
||||
rbac.NewRule("get", "list", "watch").Groups(legacyGroup).Resources("secrets").RuleOrDie(),
|
||||
},
|
||||
})
|
||||
addNamespaceRole(metav1.NamespaceSystem, rbac.Role{
|
||||
// role for the cloud providers to access/create kube-system configmaps
|
||||
ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "cloud-provider"},
|
||||
Rules: []rbac.PolicyRule{
|
||||
rbac.NewRule("create", "get", "list", "watch").Groups(legacyGroup).Resources("configmaps").RuleOrDie(),
|
||||
},
|
||||
})
|
||||
addNamespaceRole(metav1.NamespaceSystem, rbac.Role{
|
||||
// role for the token-cleaner to be able to remove secrets, but only in kube-system
|
||||
ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "token-cleaner"},
|
||||
@ -95,6 +102,8 @@ func init() {
|
||||
})
|
||||
addNamespaceRoleBinding(metav1.NamespaceSystem,
|
||||
rbac.NewRoleBinding(saRolePrefix+"bootstrap-signer", metav1.NamespaceSystem).SAs(metav1.NamespaceSystem, "bootstrap-signer").BindingOrDie())
|
||||
addNamespaceRoleBinding(metav1.NamespaceSystem,
|
||||
rbac.NewRoleBinding(saRolePrefix+"cloud-provider", metav1.NamespaceSystem).SAs(metav1.NamespaceSystem, "cloud-provider").BindingOrDie())
|
||||
addNamespaceRoleBinding(metav1.NamespaceSystem,
|
||||
rbac.NewRoleBinding(saRolePrefix+"token-cleaner", metav1.NamespaceSystem).SAs(metav1.NamespaceSystem, "token-cleaner").BindingOrDie())
|
||||
|
||||
|
@ -36,6 +36,24 @@ items:
|
||||
- kind: ServiceAccount
|
||||
name: bootstrap-signer
|
||||
namespace: kube-system
|
||||
- apiVersion: rbac.authorization.k8s.io/v1beta1
|
||||
kind: RoleBinding
|
||||
metadata:
|
||||
annotations:
|
||||
rbac.authorization.kubernetes.io/autoupdate: "true"
|
||||
creationTimestamp: null
|
||||
labels:
|
||||
kubernetes.io/bootstrapping: rbac-defaults
|
||||
name: system:controller:cloud-provider
|
||||
namespace: kube-system
|
||||
roleRef:
|
||||
apiGroup: rbac.authorization.k8s.io
|
||||
kind: Role
|
||||
name: system:controller:cloud-provider
|
||||
subjects:
|
||||
- kind: ServiceAccount
|
||||
name: cloud-provider
|
||||
namespace: kube-system
|
||||
- apiVersion: rbac.authorization.k8s.io/v1beta1
|
||||
kind: RoleBinding
|
||||
metadata:
|
||||
|
@ -73,6 +73,26 @@ items:
|
||||
- get
|
||||
- list
|
||||
- watch
|
||||
- apiVersion: rbac.authorization.k8s.io/v1beta1
|
||||
kind: Role
|
||||
metadata:
|
||||
annotations:
|
||||
rbac.authorization.kubernetes.io/autoupdate: "true"
|
||||
creationTimestamp: null
|
||||
labels:
|
||||
kubernetes.io/bootstrapping: rbac-defaults
|
||||
name: system:controller:cloud-provider
|
||||
namespace: kube-system
|
||||
rules:
|
||||
- apiGroups:
|
||||
- ""
|
||||
resources:
|
||||
- configmaps
|
||||
verbs:
|
||||
- create
|
||||
- get
|
||||
- list
|
||||
- watch
|
||||
- apiVersion: rbac.authorization.k8s.io/v1beta1
|
||||
kind: Role
|
||||
metadata:
|
||||
|
Loading…
Reference in New Issue
Block a user