component-base/logs: improve handling of re-applying a configuration

Normal binaries should never have to do this. It's not safe when there are
already some goroutines running which might do logging. Therefore the new
default is to return an error when a binary accidentally re-applies.

A few unit ensure that there are no goroutines and have to call the functions
more then once. The new ResetForTest API gets used by those to enable changing the
logging settings more than once in the same process.

Integration tests use the same code as the normal binaries. To make reuse of
that code safe, component-base/logs can be configured to silently ignore any
additional calls. This addresses data races that were found when enabling -race
for integration tests. To catch cases where the integration test does want
to modify the config, the old and new config get compared and an error is
raised when it's not the same.

To avoid having to modify all integration tests which start test servers,
reconfiguring component-base/logs is done by the test server packages.
This commit is contained in:
Patrick Ohly 2023-04-05 09:24:15 +02:00
parent a374d893f0
commit 02efe09abe
10 changed files with 189 additions and 7 deletions

View File

@ -42,6 +42,7 @@ import (
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/util/cert"
logsapi "k8s.io/component-base/logs/api/v1"
"k8s.io/klog/v2"
"k8s.io/kube-aggregator/pkg/apiserver"
"k8s.io/kubernetes/cmd/kube-apiserver/app"
@ -50,6 +51,13 @@ import (
testutil "k8s.io/kubernetes/test/utils"
)
func init() {
// If instantiated more than once or together with other servers, the
// servers would try to modify the global logging state. This must get
// ignored during testing.
logsapi.ReapplyHandling = logsapi.ReapplyHandlingIgnoreUnchanged
}
// This key is for testing purposes only and is not considered secure.
const ecdsaPrivateKey = `-----BEGIN EC PRIVATE KEY-----
MHcCAQEEIEZmTmUhuanLjPA2CLquXivuwBDHTt5XYwgIr/kA1LtRoAoGCCqGSM49

View File

@ -28,14 +28,21 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
logsapi "k8s.io/component-base/logs/api/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/cmd/kube-controller-manager/app"
kubecontrollerconfig "k8s.io/kubernetes/cmd/kube-controller-manager/app/config"
"k8s.io/kubernetes/cmd/kube-controller-manager/app/options"
"k8s.io/kubernetes/cmd/kube-controller-manager/names"
)
func init() {
// If instantiated more than once or together with other servers, the
// servers would try to modify the global logging state. This must get
// ignored during testing.
logsapi.ReapplyHandling = logsapi.ReapplyHandlingIgnoreUnchanged
}
// TearDownFunc is to be called to tear down a test server.
type TearDownFunc func()

View File

@ -29,6 +29,7 @@ import (
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/component-base/configz"
logsapi "k8s.io/component-base/logs/api/v1"
"k8s.io/kubernetes/cmd/kube-scheduler/app"
kubeschedulerconfig "k8s.io/kubernetes/cmd/kube-scheduler/app/config"
"k8s.io/kubernetes/cmd/kube-scheduler/app/options"
@ -36,6 +37,13 @@ import (
"k8s.io/klog/v2"
)
func init() {
// If instantiated more than once or together with other servers, the
// servers would try to modify the global logging state. This must get
// ignored during testing.
logsapi.ReapplyHandling = logsapi.ReapplyHandlingIgnoreUnchanged
}
// TearDownFunc is to be called to tear down a test server.
type TearDownFunc func()

View File

@ -34,9 +34,17 @@ import (
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
logsapi "k8s.io/component-base/logs/api/v1"
"k8s.io/klog/v2"
)
func init() {
// If instantiated more than once or together with other servers, the
// servers would try to modify the global logging state. This must get
// ignored during testing.
logsapi.ReapplyHandling = logsapi.ReapplyHandlingIgnoreUnchanged
}
// TearDownFunc is to be called to tear down a test server.
type TearDownFunc func()

View File

@ -34,9 +34,26 @@ import (
"k8s.io/cloud-provider/names"
"k8s.io/cloud-provider/options"
cliflag "k8s.io/component-base/cli/flag"
logsapi "k8s.io/component-base/logs/api/v1"
"k8s.io/klog/v2"
)
func init() {
// If instantiated more than once or together with other servers, the
// servers would try to modify the global logging state. This must get
// ignored during testing.
logsapi.ReapplyHandling = logsapi.ReapplyHandlingIgnoreUnchanged
// Because the test server gets started after other goroutines are
// running already, we also have to initialize logging here when
// those goroutines are not running yet. This works because the
// test server uses the default config.
config := logsapi.NewLoggingConfiguration()
if err := logsapi.ValidateAndApply(config, nil); err != nil {
panic(err)
}
}
// TearDownFunc is to be called to tear down a test server.
type TearDownFunc func()

View File

@ -17,14 +17,17 @@ limitations under the License.
package v1
import (
"errors"
"flag"
"fmt"
"io"
"math"
"os"
"strings"
"sync/atomic"
"time"
"github.com/google/go-cmp/cmp"
"github.com/spf13/pflag"
"k8s.io/klog/v2"
@ -57,6 +60,24 @@ func NewLoggingConfiguration() *LoggingConfiguration {
return &c
}
// Applying configurations multiple times is not safe unless it's guaranteed that there
// are no goroutines which might call logging functions. The default for ValidateAndApply
// and ValidateAndApplyWithOptions is to return an error when called more than once.
// Binaries and unit tests can override that behavior.
var ReapplyHandling = ReapplyHandlingError
type ReapplyHandlingType int
const (
// ReapplyHandlingError is the default: calling ValidateAndApply or
// ValidateAndApplyWithOptions again returns an error.
ReapplyHandlingError ReapplyHandlingType = iota
// ReapplyHandlingIgnoreUnchanged silently ignores any additional calls of
// ValidateAndApply or ValidateAndApplyWithOptions if the configuration
// is unchanged, otherwise they return an error.
ReapplyHandlingIgnoreUnchanged
)
// ValidateAndApply combines validation and application of the logging configuration.
// This should be invoked as early as possible because then the rest of the program
// startup (including validation of other options) will already run with the final
@ -64,6 +85,10 @@ func NewLoggingConfiguration() *LoggingConfiguration {
//
// The optional FeatureGate controls logging features. If nil, the default for
// these features is used.
//
// Logging options must be applied as early as possible during the program
// startup. Some changes are global and cannot be done safely when there are
// already goroutines running.
func ValidateAndApply(c *LoggingConfiguration, featureGate featuregate.FeatureGate) error {
return validateAndApply(c, nil, featureGate, nil)
}
@ -71,6 +96,10 @@ func ValidateAndApply(c *LoggingConfiguration, featureGate featuregate.FeatureGa
// ValidateAndApplyWithOptions is a variant of ValidateAndApply which accepts
// additional options beyond those that can be configured through the API. This
// is meant for testing.
//
// Logging options must be applied as early as possible during the program
// startup. Some changes are global and cannot be done safely when there are
// already goroutines running.
func ValidateAndApplyWithOptions(c *LoggingConfiguration, options *LoggingOptions, featureGate featuregate.FeatureGate) error {
return validateAndApply(c, options, featureGate, nil)
}
@ -183,10 +212,30 @@ func featureEnabled(featureGate featuregate.FeatureGate, feature featuregate.Fea
}
func apply(c *LoggingConfiguration, options *LoggingOptions, featureGate featuregate.FeatureGate) error {
contextualLoggingEnabled := contextualLoggingDefault
if featureGate != nil {
contextualLoggingEnabled = featureGate.Enabled(ContextualLogging)
p := &parameters{
C: c,
Options: options,
ContextualLoggingEnabled: contextualLoggingDefault,
}
if featureGate != nil {
p.ContextualLoggingEnabled = featureGate.Enabled(ContextualLogging)
}
oldP := applyParameters.Load()
if oldP != nil {
switch ReapplyHandling {
case ReapplyHandlingError:
return errors.New("logging configuration was already applied earlier, changing it is not allowed")
case ReapplyHandlingIgnoreUnchanged:
if diff := cmp.Diff(oldP, p); diff != "" {
return fmt.Errorf("the logging configuration should not be changed after setting it once (- old setting, + new setting):\n%s", diff)
}
return nil
default:
return fmt.Errorf("invalid value %d for ReapplyHandling", ReapplyHandling)
}
}
applyParameters.Store(p)
// if log format not exists, use nil loggr
format, _ := logRegistry.get(c.Format)
@ -205,7 +254,7 @@ func apply(c *LoggingConfiguration, options *LoggingOptions, featureGate feature
defer setverbositylevel.Mutex.Unlock()
setverbositylevel.Callbacks = append(setverbositylevel.Callbacks, control.SetVerbosityLevel)
}
klog.SetLoggerWithOptions(log, klog.ContextualLogger(contextualLoggingEnabled), klog.FlushLogger(control.Flush))
klog.SetLoggerWithOptions(log, klog.ContextualLogger(p.ContextualLoggingEnabled), klog.FlushLogger(control.Flush))
}
if err := loggingFlags.Lookup("v").Value.Set(VerbosityLevelPflag(&c.Verbosity).String()); err != nil {
return fmt.Errorf("internal error while setting klog verbosity: %v", err)
@ -214,7 +263,40 @@ func apply(c *LoggingConfiguration, options *LoggingOptions, featureGate feature
return fmt.Errorf("internal error while setting klog vmodule: %v", err)
}
klog.StartFlushDaemon(c.FlushFrequency)
klog.EnableContextualLogging(contextualLoggingEnabled)
klog.EnableContextualLogging(p.ContextualLoggingEnabled)
return nil
}
type parameters struct {
C *LoggingConfiguration
Options *LoggingOptions
ContextualLoggingEnabled bool
}
var applyParameters atomic.Pointer[parameters]
// ResetForTest restores the default settings. This is not thread-safe and should only
// be used when there are no goroutines running. The intended users are unit
// tests in other packages.
func ResetForTest(featureGate featuregate.FeatureGate) error {
oldP := applyParameters.Load()
if oldP == nil {
// Nothing to do.
return nil
}
// This makes it possible to call apply again without triggering errors.
applyParameters.Store(nil)
// Restore defaults. Shouldn't fail, but check anyway.
config := NewLoggingConfiguration()
if err := ValidateAndApply(config, featureGate); err != nil {
return fmt.Errorf("apply default configuration: %v", err)
}
// And again...
applyParameters.Store(nil)
return nil
}

View File

@ -32,6 +32,34 @@ import (
"k8s.io/klog/v2"
)
func TestReapply(t *testing.T) {
oldReapplyHandling := ReapplyHandling
defer func() {
ReapplyHandling = oldReapplyHandling
if err := ResetForTest(nil /* feature gates */); err != nil {
t.Errorf("Unexpected error resetting the logging configuration: %v", err)
}
}()
newOptions := NewLoggingConfiguration()
if err := ValidateAndApply(newOptions, nil); err != nil {
t.Errorf("unexpected error for first ValidateAndApply: %v", err)
}
ReapplyHandling = ReapplyHandlingError
if err := ValidateAndApply(newOptions, nil); err == nil {
t.Error("did not get expected error for second ValidateAndApply")
}
ReapplyHandling = ReapplyHandlingIgnoreUnchanged
if err := ValidateAndApply(newOptions, nil); err != nil {
t.Errorf("unexpected error for third ValidateAndApply: %v", err)
}
modifiedOptions := newOptions.DeepCopy()
modifiedOptions.Verbosity = 100
if err := ValidateAndApply(modifiedOptions, nil); err == nil {
t.Errorf("unexpected success for forth ValidateAndApply, should have complained about modified config")
}
}
func TestOptions(t *testing.T) {
newOptions := NewLoggingConfiguration()
testcases := []struct {
@ -75,6 +103,11 @@ func TestOptions(t *testing.T) {
if !assert.Equal(t, tc.want, c) {
t.Errorf("Wrong Validate() result for %q. expect %v, got %v", tc.name, tc.want, c)
}
defer func() {
if err := ResetForTest(nil /* feature gates */); err != nil {
t.Errorf("Unexpected error resetting the logging configuration: %v", err)
}
}()
errs := ValidateAndApply(c, nil /* We don't care about feature gates here. */)
defer klog.StopFlushDaemon()
if !assert.ElementsMatch(t, tc.errs, errs) {
@ -182,6 +215,11 @@ func testContextualLogging(t *testing.T, enabled bool) {
AddFeatureGates(featureGate)
err = featureGate.SetFromMap(map[string]bool{string(ContextualLogging): enabled})
require.NoError(t, err)
defer func() {
if err := ResetForTest(nil /* feature gates */); err != nil {
t.Errorf("Unexpected error resetting the logging configuration: %v", err)
}
}()
err = ValidateAndApply(c, featureGate)
require.NoError(t, err)
defer klog.StopFlushDaemon()

View File

@ -155,8 +155,12 @@ func TestJSONFormatRegister(t *testing.T) {
err := mutable.SetFromMap(map[string]bool{string(logsapi.ContextualLogging): tc.contextualLogging})
require.NoError(t, err)
featureGate = mutable
defer func() {
if err := logsapi.ResetForTest(featureGate); err != nil {
t.Errorf("Unexpected error while resetting the logging configuration: %v", err)
}
}()
errs := logsapi.ValidateAndApply(c, featureGate)
defer klog.ClearLogger()
if !assert.ElementsMatch(t, tc.errs, errs) {
t.Errorf("Wrong Validate() result for %q.\n expect:\t%+v\n got:\t%+v", tc.name, tc.errs, errs)

View File

@ -81,6 +81,11 @@ func TestGlogSetter(t *testing.T) {
}
os.Stderr = newStderr
defer func() {
if err := logsapi.ResetForTest(nil /* feature gates */); err != nil {
t.Errorf("Unexpected error resetting the logging configuration: %v", err)
}
}()
tc.init(t)
// First write with default verbosity level of 0 -> no output.

View File

@ -194,6 +194,11 @@ func TestData(t *testing.T) {
InfoStream: &buffer,
}
klog.SetOutput(&buffer)
defer func() {
if err := logsapi.ResetForTest(nil); err != nil {
t.Errorf("Unexpected error resetting the logging configuration: %v", err)
}
}()
if err := logsapi.ValidateAndApplyWithOptions(c, &o, nil); err != nil {
t.Fatalf("Unexpected error configuring logging: %v", err)
}