Merge pull request #60739 from tallclair/audit-buffer

Automatic merge from submit-queue (batch tested with PRs 60737, 60739, 61080, 60968, 60951). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Fix default auditing options.

- Log backend defaults to blocking mode (backwards compatability)
- Webhook backend defaults to throttled
- Fix webhook validation
- Add options test

**Which issue(s) this PR fixes**:
Fixes #60719

**Special notes for your reviewer**:
This PR is an alternative fix to https://github.com/kubernetes/kubernetes/pull/60727. If the rollback goes in first, I'll rebase this on a roll-forward.

**Release note**:
-->
```release-note
NONE
```
This commit is contained in:
Kubernetes Submit Queue 2018-03-13 12:26:57 -07:00 committed by GitHub
commit c13d9ffea9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 280 additions and 47 deletions

View File

@ -17,6 +17,9 @@ limitations under the License.
package audit
import (
"fmt"
"strings"
"k8s.io/apimachinery/pkg/util/errors"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
)
@ -55,3 +58,11 @@ func (u union) Shutdown() {
backend.Shutdown()
}
}
func (u union) String() string {
var backendStrings []string
for _, backend := range u.backends {
backendStrings = append(backendStrings, fmt.Sprintf("%s", backend))
}
return fmt.Sprintf("union[%s]", strings.Join(backendStrings, ","))
}

View File

@ -74,12 +74,15 @@ go_test(
name = "go_default_test",
srcs = [
"admission_test.go",
"audit_test.go",
"serving_test.go",
],
data = glob(["testdata/**"]),
embed = [":go_default_library"],
deps = [
"//vendor/github.com/spf13/pflag:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
@ -90,6 +93,7 @@ go_test(
"//vendor/k8s.io/apiserver/pkg/util/flag:go_default_library",
"//vendor/k8s.io/client-go/discovery:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/tools/clientcmd/api/v1:go_default_library",
],
)

View File

@ -116,15 +116,15 @@ func NewAuditOptions() *AuditOptions {
WebhookOptions: AuditWebhookOptions{
BatchOptions: AuditBatchOptions{
Mode: ModeBatch,
BatchConfig: defaultLogBatchConfig,
BatchConfig: pluginbuffered.NewDefaultBatchConfig(),
},
InitialBackoff: pluginwebhook.DefaultInitialBackoff,
},
LogOptions: AuditLogOptions{
Format: pluginlog.FormatJson,
BatchOptions: AuditBatchOptions{
Mode: ModeBatch,
BatchConfig: pluginbuffered.NewDefaultBatchConfig(),
Mode: ModeBlocking,
BatchConfig: defaultLogBatchConfig,
},
},
}
@ -145,46 +145,10 @@ func (o *AuditOptions) Validate() []error {
if len(o.WebhookOptions.ConfigFile) > 0 {
allErrors = append(allErrors, fmt.Errorf("feature '%s' must be enabled to set option --audit-webhook-config-file", features.AdvancedAuditing))
}
} else {
// check webhook configuration
if err := validateBackendMode(pluginwebhook.PluginName, o.WebhookOptions.BatchOptions.Mode); err != nil {
allErrors = append(allErrors, err)
}
if err := validateBackendBatchConfig(pluginwebhook.PluginName, o.LogOptions.BatchOptions.BatchConfig); err != nil {
allErrors = append(allErrors, err)
}
// check log configuration
if err := validateBackendMode(pluginlog.PluginName, o.LogOptions.BatchOptions.Mode); err != nil {
allErrors = append(allErrors, err)
}
if err := validateBackendBatchConfig(pluginlog.PluginName, o.LogOptions.BatchOptions.BatchConfig); err != nil {
allErrors = append(allErrors, err)
}
// Check log format
validFormat := false
for _, f := range pluginlog.AllowedFormats {
if f == o.LogOptions.Format {
validFormat = true
break
}
}
if !validFormat {
allErrors = append(allErrors, fmt.Errorf("invalid audit log format %s, allowed formats are %q", o.LogOptions.Format, strings.Join(pluginlog.AllowedFormats, ",")))
}
}
// Check validities of MaxAge, MaxBackups and MaxSize of log options
if o.LogOptions.MaxAge < 0 {
allErrors = append(allErrors, fmt.Errorf("--audit-log-maxage %v can't be a negative number", o.LogOptions.MaxAge))
}
if o.LogOptions.MaxBackups < 0 {
allErrors = append(allErrors, fmt.Errorf("--audit-log-maxbackup %v can't be a negative number", o.LogOptions.MaxBackups))
}
if o.LogOptions.MaxSize < 0 {
allErrors = append(allErrors, fmt.Errorf("--audit-log-maxsize %v can't be a negative number", o.LogOptions.MaxSize))
}
allErrors = append(allErrors, o.LogOptions.Validate()...)
allErrors = append(allErrors, o.WebhookOptions.Validate()...)
return allErrors
}
@ -198,7 +162,15 @@ func validateBackendMode(pluginName string, mode string) error {
return fmt.Errorf("invalid audit %s mode %s, allowed modes are %q", pluginName, mode, strings.Join(AllowedModes, ","))
}
func validateBackendBatchConfig(pluginName string, config pluginbuffered.BatchConfig) error {
func validateBackendBatchOptions(pluginName string, options AuditBatchOptions) error {
if err := validateBackendMode(pluginName, options.Mode); err != nil {
return err
}
if options.Mode != ModeBatch {
// Don't validate the unused options.
return nil
}
config := options.BatchConfig
if config.BufferSize <= 0 {
return fmt.Errorf("invalid audit batch %s buffer size %v, must be a positive number", pluginName, config.BufferSize)
}
@ -317,8 +289,52 @@ func (o *AuditLogOptions) AddFlags(fs *pflag.FlagSet) {
" gate. Known formats are "+strings.Join(pluginlog.AllowedFormats, ",")+".")
}
func (o *AuditLogOptions) Validate() []error {
// Check whether the log backend is enabled based on the options.
if !o.enabled() {
return nil
}
var allErrors []error
if advancedAuditingEnabled() {
if err := validateBackendBatchOptions(pluginlog.PluginName, o.BatchOptions); err != nil {
allErrors = append(allErrors, err)
}
// Check log format
validFormat := false
for _, f := range pluginlog.AllowedFormats {
if f == o.Format {
validFormat = true
break
}
}
if !validFormat {
allErrors = append(allErrors, fmt.Errorf("invalid audit log format %s, allowed formats are %q", o.Format, strings.Join(pluginlog.AllowedFormats, ",")))
}
}
// Check validities of MaxAge, MaxBackups and MaxSize of log options, if file log backend is enabled.
if o.MaxAge < 0 {
allErrors = append(allErrors, fmt.Errorf("--audit-log-maxage %v can't be a negative number", o.MaxAge))
}
if o.MaxBackups < 0 {
allErrors = append(allErrors, fmt.Errorf("--audit-log-maxbackup %v can't be a negative number", o.MaxBackups))
}
if o.MaxSize < 0 {
allErrors = append(allErrors, fmt.Errorf("--audit-log-maxsize %v can't be a negative number", o.MaxSize))
}
return allErrors
}
// Check whether the log backend is enabled based on the options.
func (o *AuditLogOptions) enabled() bool {
return o != nil && o.Path != ""
}
func (o *AuditLogOptions) getWriter() io.Writer {
if o.Path == "" {
if !o.enabled() {
return nil
}
@ -359,8 +375,26 @@ func (o *AuditWebhookOptions) AddFlags(fs *pflag.FlagSet) {
"Deprecated, use --audit-webhook-initial-backoff instead.")
}
func (o *AuditWebhookOptions) Validate() []error {
if !o.enabled() {
return nil
}
var allErrors []error
if advancedAuditingEnabled() {
if err := validateBackendBatchOptions(pluginwebhook.PluginName, o.BatchOptions); err != nil {
allErrors = append(allErrors, err)
}
}
return allErrors
}
func (o *AuditWebhookOptions) enabled() bool {
return o != nil && o.ConfigFile != ""
}
func (o *AuditWebhookOptions) applyTo(c *server.Config) error {
if o.ConfigFile == "" {
if !o.enabled() {
return nil
}

View File

@ -0,0 +1,172 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package options
import (
stdjson "encoding/json"
"fmt"
"io/ioutil"
"os"
"testing"
"k8s.io/apiserver/pkg/server"
"k8s.io/client-go/tools/clientcmd/api/v1"
"github.com/spf13/pflag"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestAuditValidOptions(t *testing.T) {
webhookConfig := makeTmpWebhookConfig(t)
defer os.Remove(webhookConfig)
testCases := []struct {
name string
options func() *AuditOptions
expected string
}{{
name: "default",
options: NewAuditOptions,
}, {
name: "default log",
options: func() *AuditOptions {
o := NewAuditOptions()
o.LogOptions.Path = "/audit"
return o
},
expected: "log",
}, {
name: "default webhook",
options: func() *AuditOptions {
o := NewAuditOptions()
o.WebhookOptions.ConfigFile = webhookConfig
return o
},
expected: "buffered<webhook>",
}, {
name: "default union",
options: func() *AuditOptions {
o := NewAuditOptions()
o.LogOptions.Path = "/audit"
o.WebhookOptions.ConfigFile = webhookConfig
return o
},
expected: "union[log,buffered<webhook>]",
}, {
name: "custom",
options: func() *AuditOptions {
o := NewAuditOptions()
o.LogOptions.BatchOptions.Mode = ModeBatch
o.LogOptions.Path = "/audit"
o.WebhookOptions.BatchOptions.Mode = ModeBlocking
o.WebhookOptions.ConfigFile = webhookConfig
return o
},
expected: "union[buffered<log>,webhook]",
}}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
options := tc.options()
require.NotNil(t, options)
// Verify flags don't change defaults.
fs := pflag.NewFlagSet("Test", pflag.PanicOnError)
options.AddFlags(fs)
require.NoError(t, fs.Parse(nil))
assert.Equal(t, tc.options(), options, "Flag defaults should match default options.")
assert.Empty(t, options.Validate(), "Options should be valid.")
config := &server.Config{}
require.NoError(t, options.ApplyTo(config))
if tc.expected == "" {
assert.Nil(t, config.AuditBackend)
} else {
assert.Equal(t, tc.expected, fmt.Sprintf("%s", config.AuditBackend))
}
})
}
}
func TestAuditInvalidOptions(t *testing.T) {
testCases := []struct {
name string
options func() *AuditOptions
}{{
name: "invalid log format",
options: func() *AuditOptions {
o := NewAuditOptions()
o.LogOptions.Path = "/audit"
o.LogOptions.Format = "foo"
return o
},
}, {
name: "invalid log mode",
options: func() *AuditOptions {
o := NewAuditOptions()
o.LogOptions.Path = "/audit"
o.LogOptions.BatchOptions.Mode = "foo"
return o
},
}, {
name: "invalid log buffer size",
options: func() *AuditOptions {
o := NewAuditOptions()
o.LogOptions.Path = "/audit"
o.LogOptions.BatchOptions.Mode = "batch"
o.LogOptions.BatchOptions.BatchConfig.BufferSize = -3
return o
},
}, {
name: "invalid webhook mode",
options: func() *AuditOptions {
o := NewAuditOptions()
o.WebhookOptions.ConfigFile = "/audit"
o.WebhookOptions.BatchOptions.Mode = "foo"
return o
},
}, {
name: "invalid webhook buffer throttle qps",
options: func() *AuditOptions {
o := NewAuditOptions()
o.WebhookOptions.ConfigFile = "/audit"
o.WebhookOptions.BatchOptions.Mode = "batch"
o.WebhookOptions.BatchOptions.BatchConfig.ThrottleQPS = -1
return o
},
}}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
options := tc.options()
require.NotNil(t, options)
assert.NotEmpty(t, options.Validate(), "Options should be invalid.")
})
}
}
func makeTmpWebhookConfig(t *testing.T) string {
config := v1.Config{
Clusters: []v1.NamedCluster{
{Cluster: v1.Cluster{Server: "localhost", InsecureSkipTLSVerify: true}},
},
}
f, err := ioutil.TempFile("", "k8s_audit_webhook_test_")
require.NoError(t, err, "creating temp file")
require.NoError(t, stdjson.NewEncoder(f).Encode(config), "writing webhook kubeconfig")
require.NoError(t, f.Close())
return f.Name()
}

View File

@ -28,8 +28,8 @@ import (
"k8s.io/client-go/util/flowcontrol"
)
// The plugin name reported in error metrics.
const pluginName = "buffered"
// PluginName is the name reported in error metrics.
const PluginName = "buffered"
const (
// Default configuration values for ModeBatch.
@ -259,7 +259,7 @@ func (b *bufferedBackend) ProcessEvents(ev ...*auditinternal.Event) {
sendErr = fmt.Errorf("audit backend shut down")
}
if sendErr != nil {
audit.HandlePluginError(pluginName, sendErr, ev[evIndex:]...)
audit.HandlePluginError(PluginName, sendErr, ev[evIndex:]...)
}
}()
@ -277,3 +277,7 @@ func (b *bufferedBackend) ProcessEvents(ev ...*auditinternal.Event) {
}
}
}
func (b *bufferedBackend) String() string {
return fmt.Sprintf("%s<%s>", PluginName, b.delegateBackend)
}

View File

@ -94,3 +94,7 @@ func (b *backend) Run(stopCh <-chan struct{}) error {
func (b *backend) Shutdown() {
// Nothing to do here.
}
func (b *backend) String() string {
return PluginName
}

View File

@ -101,3 +101,7 @@ func (b *backend) processEvents(ev ...*auditinternal.Event) error {
return b.w.RestClient.Post().Body(&list).Do()
}).Error()
}
func (b *backend) String() string {
return PluginName
}