1
0
mirror of https://github.com/rancher/rke.git synced 2025-04-28 03:31:24 +00:00

auditlog and eventratelimit changes

This commit is contained in:
Murali Paluru 2019-10-30 13:04:16 -07:00 committed by Alena Prokharchyk
parent 995fa72fe2
commit bf8688e709
4 changed files with 334 additions and 7 deletions

View File

@ -2,15 +2,15 @@ package cluster
import (
"context"
"encoding/json"
"fmt"
"net"
"reflect"
"strings"
"time"
"github.com/rancher/rke/pki/cert"
"github.com/docker/docker/api/types"
ghodssyaml "github.com/ghodss/yaml"
"github.com/rancher/rke/authz"
"github.com/rancher/rke/docker"
"github.com/rancher/rke/hosts"
@ -18,6 +18,7 @@ import (
"github.com/rancher/rke/log"
"github.com/rancher/rke/metadata"
"github.com/rancher/rke/pki"
"github.com/rancher/rke/pki/cert"
"github.com/rancher/rke/services"
"github.com/rancher/rke/util"
v3 "github.com/rancher/types/apis/management.cattle.io/v3"
@ -25,6 +26,10 @@ import (
"golang.org/x/sync/errgroup"
"gopkg.in/yaml.v2"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
apiserverv1alpha1 "k8s.io/apiserver/pkg/apis/apiserver/v1alpha1"
auditv1 "k8s.io/apiserver/pkg/apis/audit/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/transport"
@ -151,6 +156,82 @@ func (c *Cluster) DeployWorkerPlane(ctx context.Context, svcOptionData map[strin
return nil
}
func parseAuditLogConfig(clusterFile string, rkeConfig *v3.RancherKubernetesEngineConfig) error {
if rkeConfig.Services.KubeAPI.AuditLog != nil &&
rkeConfig.Services.KubeAPI.AuditLog.Enabled &&
rkeConfig.Services.KubeAPI.AuditLog.Configuration.Policy == nil {
return nil
}
logrus.Debugf("audit log policy found in cluster.yml")
var err error
var r map[string]interface{}
err = ghodssyaml.Unmarshal([]byte(clusterFile), &r)
if err != nil {
return fmt.Errorf("error unmarshalling: %v", err)
}
services := r["services"].(map[string]interface{})
kubeapi := services["kube-api"].(map[string]interface{})
auditlog := kubeapi["audit_log"].(map[string]interface{})
alc := auditlog["configuration"].(map[string]interface{})
policyBytes, err := json.Marshal(alc["policy"])
if err != nil {
return fmt.Errorf("error marshalling audit policy: %v", err)
}
scheme := runtime.NewScheme()
err = auditv1.AddToScheme(scheme)
if err != nil {
return fmt.Errorf("error adding to scheme: %v", err)
}
codecs := serializer.NewCodecFactory(scheme)
p := auditv1.Policy{}
err = runtime.DecodeInto(codecs.UniversalDecoder(), policyBytes, &p)
if err != nil || p.Kind != "Policy" {
return fmt.Errorf("error decoding audit policy: %v", err)
}
rkeConfig.Services.KubeAPI.AuditLog.Configuration.Policy = &p
return err
}
func parseAdmissionConfig(clusterFile string, rkeConfig *v3.RancherKubernetesEngineConfig) error {
if rkeConfig.Services.KubeAPI.AdmissionConfiguration == nil {
return nil
}
logrus.Debugf("admission configuration found in cluster.yml")
var err error
var r map[string]interface{}
err = ghodssyaml.Unmarshal([]byte(clusterFile), &r)
if err != nil {
return fmt.Errorf("error unmarshalling: %v", err)
}
services := r["services"].(map[string]interface{})
kubeapi := services["kube-api"].(map[string]interface{})
data, err := json.Marshal(kubeapi["admission_configuration"])
if err != nil {
return fmt.Errorf("error marshalling admission configuration: %v", err)
}
scheme := runtime.NewScheme()
err = apiserverv1alpha1.AddToScheme(scheme)
if err != nil {
return fmt.Errorf("error adding to scheme: %v", err)
}
err = scheme.SetVersionPriority(apiserverv1alpha1.SchemeGroupVersion)
if err != nil {
return fmt.Errorf("error setting version priority: %v", err)
}
codecs := serializer.NewCodecFactory(scheme)
decoder := codecs.UniversalDecoder(apiserverv1alpha1.SchemeGroupVersion)
decodedObj, err := runtime.Decode(decoder, data)
if err != nil {
return fmt.Errorf("error decoding data: %v", err)
}
decodedConfig, ok := decodedObj.(*apiserverv1alpha1.AdmissionConfiguration)
if !ok {
return fmt.Errorf("unexpected type: %T", decodedObj)
}
rkeConfig.Services.KubeAPI.AdmissionConfiguration = decodedConfig
return nil
}
func ParseConfig(clusterFile string) (*v3.RancherKubernetesEngineConfig, error) {
logrus.Debugf("Parsing cluster file [%v]", clusterFile)
var rkeConfig v3.RancherKubernetesEngineConfig
@ -170,6 +251,12 @@ func ParseConfig(clusterFile string) (*v3.RancherKubernetesEngineConfig, error)
if isEncryptionEnabled(&rkeConfig) && secretConfig != nil {
rkeConfig.Services.KubeAPI.SecretsEncryptionConfig.CustomConfig = secretConfig
}
if err := parseAdmissionConfig(clusterFile, &rkeConfig); err != nil {
return &rkeConfig, fmt.Errorf("error parsing admission config: %v", err)
}
if err := parseAuditLogConfig(clusterFile, &rkeConfig); err != nil {
return &rkeConfig, fmt.Errorf("error parsing audit log config: %v", err)
}
return &rkeConfig, nil
}

View File

@ -2,20 +2,25 @@ package cluster
import (
"context"
"encoding/json"
"fmt"
"strings"
"github.com/rancher/rke/metadata"
"github.com/rancher/rke/cloudprovider"
"github.com/rancher/rke/docker"
"github.com/rancher/rke/k8s"
"github.com/rancher/rke/log"
"github.com/rancher/rke/metadata"
"github.com/rancher/rke/services"
"github.com/rancher/rke/templates"
"github.com/rancher/rke/util"
"github.com/rancher/types/apis/management.cattle.io/v3"
v3 "github.com/rancher/types/apis/management.cattle.io/v3"
"github.com/sirupsen/logrus"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
apiserverv1alpha1 "k8s.io/apiserver/pkg/apis/apiserver/v1alpha1"
auditv1 "k8s.io/apiserver/pkg/apis/audit/v1"
eventratelimitv1alpha1 "k8s.io/kubernetes/plugin/pkg/admission/eventratelimit/apis/eventratelimit/v1alpha1"
)
const (
@ -58,6 +63,20 @@ const (
DefaultFlannelBackendVxLan = "vxlan"
DefaultFlannelBackendVxLanPort = "8472"
DefaultFlannelBackendVxLanVNI = "1"
KubeAPIArgAdmissionControlConfigFile = "admission-control-config-file"
DefaultKubeAPIArgAdmissionControlConfigFileValue = "/etc/kubernetes/admission.yaml"
EventRateLimitPluginName = "EventRateLimit"
KubeAPIArgAuditLogPath = "audit-log-path"
KubeAPIArgAuditLogMaxAge = "audit-log-maxage"
KubeAPIArgAuditLogMaxBackup = "audit-log-maxbackup"
KubeAPIArgAuditLogMaxSize = "audit-log-maxsize"
KubeAPIArgAuditLogFormat = "audit-log-format"
KubeAPIArgAuditPolicyFile = "audit-policy-file"
DefaultKubeAPIArgAuditLogPathValue = "/var/log/kube-audit/audit-log.json"
DefaultKubeAPIArgAuditPolicyFileValue = "/etc/kubernetes/audit.yaml"
)
type ExternalFlags struct {
@ -224,6 +243,117 @@ func (c *Cluster) setClusterServicesDefaults() {
c.Services.Etcd.BackupConfig.Retention = DefaultEtcdBackupConfigRetention
}
}
if _, ok := c.Services.KubeAPI.ExtraArgs[KubeAPIArgAdmissionControlConfigFile]; !ok {
if c.Services.KubeAPI.EventRateLimit != nil &&
c.Services.KubeAPI.EventRateLimit.Enabled &&
c.Services.KubeAPI.EventRateLimit.Configuration == nil {
c.Services.KubeAPI.EventRateLimit.Configuration = newDefaultEventRateLimitConfig()
}
}
if c.Services.KubeAPI.AuditLog != nil &&
c.Services.KubeAPI.AuditLog.Enabled {
if c.Services.KubeAPI.AuditLog.Configuration == nil {
alc := newDefaultAuditLogConfig()
c.Services.KubeAPI.AuditLog.Configuration = alc
} else {
if c.Services.KubeAPI.AuditLog.Configuration.Policy == nil {
c.Services.KubeAPI.AuditLog.Configuration.Policy = newDefaultAuditPolicy()
}
}
}
}
func newDefaultAuditPolicy() *auditv1.Policy {
p := &auditv1.Policy{
TypeMeta: v1.TypeMeta{
Kind: "Policy",
APIVersion: auditv1.SchemeGroupVersion.String(),
},
Rules: []auditv1.PolicyRule{
{
Level: "Metadata",
},
},
OmitStages: nil,
}
return p
}
func newDefaultAuditLogConfig() *v3.AuditLogConfig {
p := newDefaultAuditPolicy()
c := &v3.AuditLogConfig{
MaxAge: 5,
MaxBackup: 5,
MaxSize: 100,
Path: DefaultKubeAPIArgAuditLogPathValue,
Format: "json",
Policy: p,
}
return c
}
func getEventRateLimitPluginFromConfig(c *eventratelimitv1alpha1.Configuration) (apiserverv1alpha1.AdmissionPluginConfiguration, error) {
plugin := apiserverv1alpha1.AdmissionPluginConfiguration{
Name: EventRateLimitPluginName,
Configuration: &runtime.Unknown{
ContentType: "application/json",
},
}
cBytes, err := json.Marshal(c)
if err != nil {
return plugin, fmt.Errorf("error marshalling eventratelimit config: %v", err)
}
plugin.Configuration.Raw = cBytes
return plugin, nil
}
func newDefaultEventRateLimitConfig() *eventratelimitv1alpha1.Configuration {
return &eventratelimitv1alpha1.Configuration{
TypeMeta: v1.TypeMeta{
Kind: "Configuration",
APIVersion: eventratelimitv1alpha1.SchemeGroupVersion.String(),
},
Limits: []eventratelimitv1alpha1.Limit{
{
Type: eventratelimitv1alpha1.ServerLimitType,
QPS: 5000,
Burst: 20000,
},
},
}
}
func newDefaultEventRateLimitPlugin() (apiserverv1alpha1.AdmissionPluginConfiguration, error) {
plugin := apiserverv1alpha1.AdmissionPluginConfiguration{
Name: EventRateLimitPluginName,
Configuration: &runtime.Unknown{
ContentType: "application/json",
},
}
c := newDefaultEventRateLimitConfig()
cBytes, err := json.Marshal(c)
if err != nil {
return plugin, fmt.Errorf("error marshalling eventratelimit config: %v", err)
}
plugin.Configuration.Raw = cBytes
return plugin, nil
}
func newDefaultAdmissionConfiguration() (*apiserverv1alpha1.AdmissionConfiguration, error) {
var admissionConfiguration *apiserverv1alpha1.AdmissionConfiguration
admissionConfiguration = &apiserverv1alpha1.AdmissionConfiguration{
TypeMeta: v1.TypeMeta{
Kind: "AdmissionConfiguration",
APIVersion: apiserverv1alpha1.SchemeGroupVersion.String(),
},
}
return admissionConfiguration, nil
}
func (c *Cluster) setClusterImageDefaults() error {

View File

@ -1,11 +1,10 @@
package cluster
import (
"context"
"fmt"
"strings"
"context"
"github.com/docker/docker/api/types"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/log"
@ -15,6 +14,8 @@ import (
v3 "github.com/rancher/types/apis/management.cattle.io/v3"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"k8s.io/apiserver/pkg/apis/apiserver/v1alpha1"
"sigs.k8s.io/yaml"
)
const (
@ -118,6 +119,69 @@ func (c *Cluster) InvertIndexHosts() error {
return nil
}
func (c *Cluster) getConsolidatedAdmissionConfiguration() (*v1alpha1.AdmissionConfiguration, error) {
var err error
var admissionConfig *v1alpha1.AdmissionConfiguration
if c.Services.KubeAPI.EventRateLimit == nil ||
!c.Services.KubeAPI.EventRateLimit.Enabled {
return c.Services.KubeAPI.AdmissionConfiguration, nil
}
logrus.Debugf("EventRateLimit is enabled")
found := false
if c.Services.KubeAPI.AdmissionConfiguration != nil {
plugins := c.Services.KubeAPI.AdmissionConfiguration.Plugins
for _, plugin := range plugins {
if plugin.Name == EventRateLimitPluginName {
found = true
break
}
}
}
if found {
logrus.Debugf("EventRateLimit Plugin configuration found in admission config")
if c.Services.KubeAPI.EventRateLimit.Configuration != nil {
logrus.Warnf("conflicting EventRateLimit configuration found, using the one from Admission Configuration")
return c.Services.KubeAPI.AdmissionConfiguration, nil
}
}
logrus.Debugf("EventRateLimit Plugin configuration not found in admission config")
if c.Services.KubeAPI.AdmissionConfiguration == nil {
logrus.Debugf("no user specified admission configuration found")
admissionConfig, err = newDefaultAdmissionConfiguration()
if err != nil {
logrus.Errorf("error getting default admission configuration: %v", err)
return nil, err
}
} else {
admissionConfig, err = newDefaultAdmissionConfiguration()
if err != nil {
logrus.Errorf("error getting default admission configuration: %v", err)
return nil, err
}
copy(admissionConfig.Plugins, c.Services.KubeAPI.AdmissionConfiguration.Plugins)
}
if c.Services.KubeAPI.EventRateLimit.Configuration != nil {
logrus.Debugf("user specified EventRateLimit configuration found")
p, err := getEventRateLimitPluginFromConfig(c.Services.KubeAPI.EventRateLimit.Configuration)
if err != nil {
logrus.Errorf("error getting eventratelimit plugin from config: %v", err)
}
admissionConfig.Plugins = append(admissionConfig.Plugins, p)
} else {
logrus.Debugf("using default EventRateLimit configuration")
p, err := newDefaultEventRateLimitPlugin()
if err != nil {
logrus.Errorf("error getting default eventratelimit plugin: %v", err)
}
admissionConfig.Plugins = append(admissionConfig.Plugins, p)
}
return admissionConfig, nil
}
func (c *Cluster) SetUpHosts(ctx context.Context, flags ExternalFlags) error {
if c.AuthnStrategies[AuthnX509Provider] {
log.Infof(ctx, "[certificates] Deploying kubernetes certificates to Cluster nodes")
@ -162,6 +226,38 @@ func (c *Cluster) SetUpHosts(ctx context.Context, flags ExternalFlags) error {
return err
}
}
if _, ok := c.Services.KubeAPI.ExtraArgs[KubeAPIArgAdmissionControlConfigFile]; !ok {
if c.Services.KubeAPI.EventRateLimit != nil && c.Services.KubeAPI.EventRateLimit.Enabled {
controlPlaneHosts := hosts.GetUniqueHostList(nil, c.ControlPlaneHosts, nil)
ac, err := c.getConsolidatedAdmissionConfiguration()
if err != nil {
return fmt.Errorf("error getting consolidated admission configuration: %v", err)
}
bytes, err := yaml.Marshal(ac)
if err != nil {
return err
}
if err := deployFile(ctx, controlPlaneHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap, DefaultKubeAPIArgAdmissionControlConfigFileValue, string(bytes)); err != nil {
return err
}
log.Infof(ctx, "[%s] Successfully deployed admission control config to Cluster control nodes", DefaultKubeAPIArgAdmissionControlConfigFileValue)
}
}
if _, ok := c.Services.KubeAPI.ExtraArgs[KubeAPIArgAuditPolicyFile]; !ok {
if c.Services.KubeAPI.AuditLog != nil && c.Services.KubeAPI.AuditLog.Enabled {
controlPlaneHosts := hosts.GetUniqueHostList(nil, c.ControlPlaneHosts, nil)
bytes, err := yaml.Marshal(c.Services.KubeAPI.AuditLog.Configuration.Policy)
if err != nil {
return err
}
if err := deployFile(ctx, controlPlaneHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap, DefaultKubeAPIArgAuditPolicyFileValue, string(bytes)); err != nil {
return err
}
log.Infof(ctx, "[%s] Successfully deployed audit policy file to Cluster control nodes", DefaultKubeAPIArgAuditPolicyFileValue)
}
}
}
return nil
}

View File

@ -263,12 +263,26 @@ func (c *Cluster) BuildKubeAPIProcess(host *hosts.Host, prefixPath string, svcOp
CommandArgs[admissionControlOptionName] = CommandArgs[admissionControlOptionName] + ",AlwaysPullImages"
}
if c.Services.KubeAPI.AuditLog != nil {
if alc := c.Services.KubeAPI.AuditLog.Configuration; alc != nil {
CommandArgs[KubeAPIArgAuditLogPath] = alc.Path
CommandArgs[KubeAPIArgAuditLogMaxAge] = strconv.Itoa(alc.MaxAge)
CommandArgs[KubeAPIArgAuditLogMaxBackup] = strconv.Itoa(alc.MaxBackup)
CommandArgs[KubeAPIArgAuditLogMaxSize] = strconv.Itoa(alc.MaxSize)
CommandArgs[KubeAPIArgAuditLogFormat] = alc.Format
CommandArgs[KubeAPIArgAuditPolicyFile] = DefaultKubeAPIArgAuditPolicyFileValue
}
}
VolumesFrom := []string{
services.SidekickContainerName,
}
Binds := []string{
fmt.Sprintf("%s:/etc/kubernetes:z", path.Join(prefixPath, "/etc/kubernetes")),
}
if c.Services.KubeAPI.AuditLog != nil && c.Services.KubeAPI.AuditLog.Enabled {
Binds = append(Binds, fmt.Sprintf("%s:/var/log/kube-audit:z", path.Join(prefixPath, "/var/log/kube-audit")))
}
// Override args if they exist, add additional args
for arg, value := range c.Services.KubeAPI.ExtraArgs {