1
0
mirror of https://github.com/rancher/rke.git synced 2025-09-02 15:34:36 +00:00

add the parsing for eventRateLimit

This commit is contained in:
Jiaqi Luo
2022-10-31 12:48:44 -07:00
parent 9244d9ffae
commit f5e18110b6
7 changed files with 79 additions and 168 deletions

View File

@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"net"
"os"
"reflect"
@@ -37,6 +38,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/transport"
eventratelimitapi "k8s.io/kubernetes/plugin/pkg/admission/eventratelimit/apis/eventratelimit"
)
type Cluster struct {
@@ -374,11 +376,11 @@ func parseAuditLogConfig(clusterFile string, rkeConfig *v3.RancherKubernetesEngi
if services["kube-api"] == nil {
return nil
}
kubeapi := services["kube-api"].(map[string]interface{})
if kubeapi["audit_log"] == nil {
kubeAPI := services["kube-api"].(map[string]interface{})
if kubeAPI["audit_log"] == nil {
return nil
}
auditlog := kubeapi["audit_log"].(map[string]interface{})
auditlog := kubeAPI["audit_log"].(map[string]interface{})
if auditlog["configuration"] == nil {
return nil
}
@@ -405,6 +407,39 @@ func parseAuditLogConfig(clusterFile string, rkeConfig *v3.RancherKubernetesEngi
return err
}
func parseEventRateLimit(clusterFile string, rkeConfig *v3.RancherKubernetesEngineConfig) error {
if rkeConfig.Services.KubeAPI.EventRateLimit == nil || !rkeConfig.Services.KubeAPI.EventRateLimit.Enabled {
return nil
}
logrus.Debugf("event rate limit is found in cluster.yml")
var r map[string]interface{}
err := ghodssyaml.Unmarshal([]byte(clusterFile), &r)
if err != nil {
return fmt.Errorf("error unmarshalling: %v", err)
}
if r["services"] == nil {
return nil
}
cfg, found, err := unstructured.NestedMap(r, "services", "kube-api", "event_rate_limit", "configuration")
if err != nil {
return err
}
if !found {
return nil
}
cfgBytes, err := json.Marshal(cfg)
if err != nil {
return fmt.Errorf("error marshalling eventRateLimit: %v", err)
}
output := eventratelimitapi.Configuration{}
err = json.Unmarshal(cfgBytes, &output)
if err != nil {
return fmt.Errorf("error decoding eventRateLimit: %v", err)
}
rkeConfig.Services.KubeAPI.EventRateLimit.Configuration = &output
return err
}
func parseAdmissionConfig(clusterFile string, rkeConfig *v3.RancherKubernetesEngineConfig) error {
if rkeConfig.Services.KubeAPI.AdmissionConfiguration == nil {
return nil
@@ -685,6 +720,9 @@ func ParseConfig(clusterFile string) (*v3.RancherKubernetesEngineConfig, error)
if err := parseAuditLogConfig(clusterFile, &rkeConfig); err != nil {
return &rkeConfig, fmt.Errorf("error parsing audit log config: %v", err)
}
if err := parseEventRateLimit(clusterFile, &rkeConfig); err != nil {
return &rkeConfig, fmt.Errorf("error parsing event rate limit config: %v", err)
}
if err := parseIngressConfig(clusterFile, &rkeConfig); err != nil {
return &rkeConfig, fmt.Errorf("error parsing ingress config: %v", err)
}

View File

@@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
eventratelimitapi "k8s.io/kubernetes/plugin/pkg/admission/eventratelimit/apis/eventratelimit"
"strings"
"github.com/blang/semver"
@@ -242,7 +243,7 @@ func (c *Cluster) setClusterDefaults(ctx context.Context, flags ExternalFlags) e
if len(c.Monitoring.Provider) == 0 {
c.Monitoring.Provider = DefaultMonitoringProvider
}
//set docker private registry URL
// set docker private registry URL
for _, pr := range c.PrivateRegistries {
if pr.URL == "" {
pr.URL = docker.DockerRegistryURL
@@ -298,7 +299,7 @@ func (c *Cluster) setNodeUpgradeStrategy() {
IgnoreDaemonSets: &DefaultNodeDrainIgnoreDaemonsets,
// default to 120 seems to work better for controlplane nodes
Timeout: DefaultNodeDrainTimeout,
//Period of time in seconds given to each pod to terminate gracefully.
// Period of time in seconds given to each pod to terminate gracefully.
// If negative, the default value specified in the pod will be used
GracePeriod: DefaultNodeDrainGracePeriod,
}
@@ -437,7 +438,7 @@ func newDefaultAuditLogConfig() *v3.AuditLogConfig {
return c
}
func getEventRateLimitPluginFromConfig(c *v3.Configuration) (apiserverv1.AdmissionPluginConfiguration, error) {
func getEventRateLimitPluginFromConfig(c *eventratelimitapi.Configuration) (apiserverv1.AdmissionPluginConfiguration, error) {
plugin := apiserverv1.AdmissionPluginConfiguration{
Name: EventRateLimitPluginName,
Configuration: &runtime.Unknown{
@@ -454,15 +455,15 @@ func getEventRateLimitPluginFromConfig(c *v3.Configuration) (apiserverv1.Admissi
return plugin, nil
}
func newDefaultEventRateLimitConfig() *v3.Configuration {
return &v3.Configuration{
func newDefaultEventRateLimitConfig() *eventratelimitapi.Configuration {
return &eventratelimitapi.Configuration{
TypeMeta: v1.TypeMeta{
Kind: "Configuration",
APIVersion: "eventratelimit.admission.k8s.io/v1alpha1",
},
Limits: []v3.Limit{
Limits: []eventratelimitapi.Limit{
{
Type: v3.ServerLimitType,
Type: eventratelimitapi.ServerLimitType,
QPS: 5000,
Burst: 20000,
},

1
go.mod
View File

@@ -39,6 +39,7 @@ require (
k8s.io/client-go v0.25.3
k8s.io/gengo v0.0.0-20211129171323-c02415ce4185
k8s.io/kubectl v0.25.3
k8s.io/kubernetes v1.13.0
sigs.k8s.io/yaml v1.2.0
)

1
go.sum
View File

@@ -1803,6 +1803,7 @@ k8s.io/kube-openapi v0.0.0-20220803162953-67bda5d908f1/go.mod h1:C/N6wCaBHeBHkHU
k8s.io/kubectl v0.0.0-20191219154910-1528d4eea6dd/go.mod h1:9ehGcuUGjXVZh0qbYSB0vvofQw2JQe6c6cO0k4wu/Oo=
k8s.io/kubectl v0.25.3 h1:HnWJziEtmsm4JaJiKT33kG0kadx68MXxUE8UEbXnN4U=
k8s.io/kubectl v0.25.3/go.mod h1:glU7PiVj/R6Ud4A9FJdTcJjyzOtCJyc0eO7Mrbh3jlI=
k8s.io/kubernetes v1.13.0 h1:qTfB+u5M92k2fCCCVP2iuhgwwSOv1EkAkvQY1tQODD8=
k8s.io/kubernetes v1.13.0/go.mod h1:ocZa8+6APFNC2tX1DZASIbocyYT5jHzqFVsY5aoB7Jk=
k8s.io/metrics v0.0.0-20191214191643-6b1944c9f765/go.mod h1:5V7rewilItwK0cz4nomU0b3XCcees2Ka5EBYWS1HBeM=
k8s.io/utils v0.0.0-20190801114015-581e00157fb1/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew=

View File

@@ -1,85 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package types
import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
// LimitType is the type of the limit (e.g., per-namespace)
type LimitType string
const (
// ServerLimitType is a type of limit where there is one bucket shared by
// all of the event queries received by the API Server.
ServerLimitType LimitType = "Server"
// NamespaceLimitType is a type of limit where there is one bucket used by
// each namespace
NamespaceLimitType LimitType = "Namespace"
// UserLimitType is a type of limit where there is one bucket used by each
// user
UserLimitType LimitType = "User"
// SourceAndObjectLimitType is a type of limit where there is one bucket used
// by each combination of source and involved object of the event.
SourceAndObjectLimitType LimitType = "SourceAndObject"
)
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// Configuration provides configuration for the EventRateLimit admission
// controller.
type Configuration struct {
metav1.TypeMeta `json:",inline"`
// limits are the limits to place on event queries received.
// Limits can be placed on events received server-wide, per namespace,
// per user, and per source+object.
// At least one limit is required.
Limits []Limit `json:"limits"`
}
// Limit is the configuration for a particular limit type
type Limit struct {
// type is the type of limit to which this configuration applies
Type LimitType `json:"type"`
// qps is the number of event queries per second that are allowed for this
// type of limit. The qps and burst fields are used together to determine if
// a particular event query is accepted. The qps determines how many queries
// are accepted once the burst amount of queries has been exhausted.
QPS int32 `json:"qps"`
// burst is the burst number of event queries that are allowed for this type
// of limit. The qps and burst fields are used together to determine if a
// particular event query is accepted. The burst determines the maximum size
// of the allowance granted for a particular bucket. For example, if the burst
// is 10 and the qps is 3, then the admission control will accept 10 queries
// before blocking any queries. Every second, 3 more queries will be allowed.
// If some of that allowance is not used, then it will roll over to the next
// second, until the maximum allowance of 10 is reached.
Burst int32 `json:"burst"`
// cacheSize is the size of the LRU cache for this type of limit. If a bucket
// is evicted from the cache, then the allowance for that bucket is reset. If
// more queries are later received for an evicted bucket, then that bucket
// will re-enter the cache with a clean slate, giving that bucket a full
// allowance of burst queries.
//
// The default cache size is 4096.
//
// If limitType is 'server', then cacheSize is ignored.
// +optional
CacheSize int32 `json:"cacheSize,omitempty"`
}

View File

@@ -6,6 +6,7 @@ import (
apiserverv1 "k8s.io/apiserver/pkg/apis/apiserver/v1"
auditv1 "k8s.io/apiserver/pkg/apis/audit/v1"
configv1 "k8s.io/apiserver/pkg/apis/config/v1"
eventratelimitapi "k8s.io/kubernetes/plugin/pkg/admission/eventratelimit/apis/eventratelimit"
)
type RancherKubernetesEngineConfig struct {
@@ -153,7 +154,7 @@ type RKESystemImages struct {
CalicoControllers string `yaml:"calico_controllers" json:"calicoControllers,omitempty"`
// Calicoctl image
CalicoCtl string `yaml:"calico_ctl" json:"calicoCtl,omitempty"`
//CalicoFlexVol image
// CalicoFlexVol image
CalicoFlexVol string `yaml:"calico_flexvol" json:"calicoFlexVol,omitempty"`
// Canal Node Image
CanalNode string `yaml:"canal_node" json:"canalNode,omitempty"`
@@ -161,11 +162,11 @@ type RKESystemImages struct {
CanalCNI string `yaml:"canal_cni" json:"canalCni,omitempty"`
// Canal Controllers Image needed for Calico/Canal v3.14.0+
CanalControllers string `yaml:"canal_controllers" json:"canalControllers,omitempty"`
//CanalFlannel image
// CanalFlannel image
CanalFlannel string `yaml:"canal_flannel" json:"canalFlannel,omitempty"`
//CanalFlexVol image
// CanalFlexVol image
CanalFlexVol string `yaml:"canal_flexvol" json:"canalFlexVol,omitempty"`
//Weave Node image
// Weave Node image
WeaveNode string `yaml:"weave_node" json:"weaveNode,omitempty"`
// Weave CNI image
WeaveCNI string `yaml:"weave_cni" json:"weaveCni,omitempty"`
@@ -308,7 +309,7 @@ type KubeAPIService struct {
type EventRateLimit struct {
Enabled bool `yaml:"enabled" json:"enabled,omitempty"`
Configuration *Configuration `yaml:"configuration" json:"configuration,omitempty" norman:"type=map[json]"`
Configuration *eventratelimitapi.Configuration `yaml:"configuration" json:"configuration,omitempty" norman:"type=map[json]"`
}
type AuditLog struct {
@@ -513,7 +514,7 @@ type Process struct {
Env []string `json:"env,omitempty"`
// Process docker image
Image string `json:"image,omitempty"`
//AuthConfig for image private registry
// AuthConfig for image private registry
ImageRegistryAuthConfig string `json:"imageRegistryAuthConfig,omitempty"`
// Process docker image VolumesFrom
VolumesFrom []string `json:"volumesFrom,omitempty"`
@@ -905,29 +906,29 @@ type GlobalAwsOpts struct {
// KubernetesClusterID is the cluster id we'll use to identify our cluster resources
KubernetesClusterID string `json:"kubernetes-cluster-id" yaml:"kubernetes-cluster-id" ini:"KubernetesClusterID,omitempty"`
//The aws provider creates an inbound rule per load balancer on the node security
//group. However, this can run into the AWS security group rule limit of 50 if
//many LoadBalancers are created.
// The aws provider creates an inbound rule per load balancer on the node security
// group. However, this can run into the AWS security group rule limit of 50 if
// many LoadBalancers are created.
//
//This flag disables the automatic ingress creation. It requires that the user
//has setup a rule that allows inbound traffic on kubelet ports from the
//local VPC subnet (so load balancers can access it). E.g. 10.82.0.0/16 30000-32000.
// This flag disables the automatic ingress creation. It requires that the user
// has setup a rule that allows inbound traffic on kubelet ports from the
// local VPC subnet (so load balancers can access it). E.g. 10.82.0.0/16 30000-32000.
DisableSecurityGroupIngress bool `json:"disable-security-group-ingress" yaml:"disable-security-group-ingress" ini:"DisableSecurityGroupIngress,omitempty"`
//AWS has a hard limit of 500 security groups. For large clusters creating a security group for each ELB
//can cause the max number of security groups to be reached. If this is set instead of creating a new
//Security group for each ELB this security group will be used instead.
// AWS has a hard limit of 500 security groups. For large clusters creating a security group for each ELB
// can cause the max number of security groups to be reached. If this is set instead of creating a new
// Security group for each ELB this security group will be used instead.
ElbSecurityGroup string `json:"elb-security-group" yaml:"elb-security-group" ini:"ElbSecurityGroup,omitempty"`
//During the instantiation of an new AWS cloud provider, the detected region
//is validated against a known set of regions.
// During the instantiation of an new AWS cloud provider, the detected region
// is validated against a known set of regions.
//
//In a non-standard, AWS like environment (e.g. Eucalyptus), this check may
//be undesirable. Setting this to true will disable the check and provide
//a warning that the check was skipped. Please note that this is an
//experimental feature and work-in-progress for the moment. If you find
//yourself in an non-AWS cloud and open an issue, please indicate that in the
//issue body.
// In a non-standard, AWS like environment (e.g. Eucalyptus), this check may
// be undesirable. Setting this to true will disable the check and provide
// a warning that the check was skipped. Please note that this is an
// experimental feature and work-in-progress for the moment. If you find
// yourself in an non-AWS cloud and open an issue, please indicate that in the
// issue body.
DisableStrictZoneCheck bool `json:"disable-strict-zone-check" yaml:"disable-strict-zone-check" ini:"DisableStrictZoneCheck,omitempty"`
}
@@ -1031,7 +1032,7 @@ type NodeDrainInput struct {
IgnoreDaemonSets *bool `yaml:"ignore_daemonsets" json:"ignoreDaemonSets,omitempty" norman:"default=true"`
// Continue even if there are pods using emptyDir
DeleteLocalData bool `yaml:"delete_local_data" json:"deleteLocalData,omitempty"`
//Period of time in seconds given to each pod to terminate gracefully.
// Period of time in seconds given to each pod to terminate gracefully.
// If negative, the default value specified in the pod will be used
GracePeriod int `yaml:"grace_period" json:"gracePeriod,omitempty" norman:"default=-1"`
// Time to wait (in seconds) before giving up for one try

View File

@@ -24,10 +24,10 @@ package types
import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
apiserverv1 "k8s.io/apiserver/pkg/apis/apiserver/v1"
v1 "k8s.io/apiserver/pkg/apis/audit/v1"
configv1 "k8s.io/apiserver/pkg/apis/config/v1"
eventratelimit "k8s.io/kubernetes/plugin/pkg/admission/eventratelimit/apis/eventratelimit"
)
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
@@ -425,36 +425,6 @@ func (in *CloudProvider) DeepCopy() *CloudProvider {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Configuration) DeepCopyInto(out *Configuration) {
*out = *in
out.TypeMeta = in.TypeMeta
if in.Limits != nil {
in, out := &in.Limits, &out.Limits
*out = make([]Limit, len(*in))
copy(*out, *in)
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Configuration.
func (in *Configuration) DeepCopy() *Configuration {
if in == nil {
return nil
}
out := new(Configuration)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *Configuration) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *DNSConfig) DeepCopyInto(out *DNSConfig) {
*out = *in
@@ -697,7 +667,7 @@ func (in *EventRateLimit) DeepCopyInto(out *EventRateLimit) {
*out = *in
if in.Configuration != nil {
in, out := &in.Configuration, &out.Configuration
*out = new(Configuration)
*out = new(eventratelimit.Configuration)
(*in).DeepCopyInto(*out)
}
return
@@ -1118,22 +1088,6 @@ func (in *KubernetesServicesOptions) DeepCopy() *KubernetesServicesOptions {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Limit) DeepCopyInto(out *Limit) {
*out = *in
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Limit.
func (in *Limit) DeepCopy() *Limit {
if in == nil {
return nil
}
out := new(Limit)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *LinearAutoscalerParams) DeepCopyInto(out *LinearAutoscalerParams) {
*out = *in