Merge pull request #72540 from immutableT/expose-kms-timeout-in-config-v2

Expose kms timeout value via encryption config.
This commit is contained in:
Kubernetes Prow Robot 2019-01-09 07:39:49 -08:00 committed by GitHub
commit 15521d0274
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 256 additions and 16 deletions

View File

@ -86,4 +86,7 @@ type KMSConfiguration struct {
CacheSize int32
// endpoint is the gRPC server listening address, for example "unix:///var/run/kms-provider.sock".
Endpoint string
// Timeout for gRPC calls to kms-plugin (ex. 5s). The default is 3 seconds.
// +optional
Timeout *metav1.Duration
}

View File

@ -84,4 +84,7 @@ type KMSConfiguration struct {
CacheSize int32 `json:"cachesize,omitempty"`
// endpoint is the gRPC server listening address, for example "unix:///var/run/kms-provider.sock".
Endpoint string `json:"endpoint"`
// Timeout for gRPC calls to kms-plugin (ex. 5s). The default is 3 seconds.
// +optional
Timeout *metav1.Duration `json:"timeout,omitempty"`
}

View File

@ -23,6 +23,7 @@ package v1
import (
unsafe "unsafe"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
conversion "k8s.io/apimachinery/pkg/conversion"
runtime "k8s.io/apimachinery/pkg/runtime"
config "k8s.io/apiserver/pkg/apis/config"
@ -180,6 +181,7 @@ func autoConvert_v1_KMSConfiguration_To_config_KMSConfiguration(in *KMSConfigura
out.Name = in.Name
out.CacheSize = in.CacheSize
out.Endpoint = in.Endpoint
out.Timeout = (*metav1.Duration)(unsafe.Pointer(in.Timeout))
return nil
}
@ -192,6 +194,7 @@ func autoConvert_config_KMSConfiguration_To_v1_KMSConfiguration(in *config.KMSCo
out.Name = in.Name
out.CacheSize = in.CacheSize
out.Endpoint = in.Endpoint
out.Timeout = (*metav1.Duration)(unsafe.Pointer(in.Timeout))
return nil
}

View File

@ -21,6 +21,7 @@ limitations under the License.
package v1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
)
@ -96,6 +97,11 @@ func (in *IdentityConfiguration) DeepCopy() *IdentityConfiguration {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *KMSConfiguration) DeepCopyInto(out *KMSConfiguration) {
*out = *in
if in.Timeout != nil {
in, out := &in.Timeout, &out.Timeout
*out = new(metav1.Duration)
**out = **in
}
return
}
@ -151,7 +157,7 @@ func (in *ProviderConfiguration) DeepCopyInto(out *ProviderConfiguration) {
if in.KMS != nil {
in, out := &in.KMS, &out.KMS
*out = new(KMSConfiguration)
**out = **in
(*in).DeepCopyInto(*out)
}
return
}

View File

@ -21,6 +21,7 @@ limitations under the License.
package config
import (
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
)
@ -96,6 +97,11 @@ func (in *IdentityConfiguration) DeepCopy() *IdentityConfiguration {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *KMSConfiguration) DeepCopyInto(out *KMSConfiguration) {
*out = *in
if in.Timeout != nil {
in, out := &in.Timeout, &out.Timeout
*out = new(v1.Duration)
**out = **in
}
return
}
@ -151,7 +157,7 @@ func (in *ProviderConfiguration) DeepCopyInto(out *ProviderConfiguration) {
if in.KMS != nil {
in, out := &in.KMS, &out.KMS
*out = new(KMSConfiguration)
**out = **in
(*in).DeepCopyInto(*out)
}
return
}

View File

@ -173,8 +173,16 @@ func GetPrefixTransformers(config *apiserverconfig.ResourceConfiguration) ([]val
return nil, fmt.Errorf("remote KMS provider can't use empty string as endpoint")
}
timeout := kmsPluginConnectionTimeout
if provider.KMS.Timeout != nil {
if provider.KMS.Timeout.Duration <= 0 {
return nil, fmt.Errorf("could not configure KMS plugin %q, timeout should be a positive value", provider.KMS.Name)
}
timeout = provider.KMS.Timeout.Duration
}
// Get gRPC client service with endpoint.
envelopeService, err := envelopeServiceFactory(provider.KMS.Endpoint, kmsPluginConnectionTimeout)
envelopeService, err := envelopeServiceFactory(provider.KMS.Endpoint, timeout)
if err != nil {
return nil, fmt.Errorf("could not configure KMS plugin %q, error: %v", provider.KMS.Name, err)
}

View File

@ -419,3 +419,91 @@ func TestEncryptionProviderConfigNoEndpointForKMS(t *testing.T) {
t.Fatalf("invalid configuration file (kms has no endpoint) got parsed:\n%s", incorrectConfigNoEndpointForKMS)
}
}
func TestKMSConfigTimeout(t *testing.T) {
testCases := []struct {
desc string
config string
want time.Duration
wantErr string
}{
{
desc: "duration explicitly provided",
config: `kind: EncryptionConfiguration
apiVersion: apiserver.config.k8s.io/v1
resources:
- resources:
- secrets
providers:
- kms:
name: foo
endpoint: unix:///tmp/testprovider.sock
timeout: 15s
`,
want: 15 * time.Second,
},
{
desc: "duration explicitly provided as 0 which is an invalid value, error should be returned",
config: `kind: EncryptionConfiguration
apiVersion: apiserver.config.k8s.io/v1
resources:
- resources:
- secrets
providers:
- kms:
name: foo
endpoint: unix:///tmp/testprovider.sock
timeout: 0s
`,
wantErr: "timeout should be a positive value",
},
{
desc: "duration is not provided, default will be supplied",
config: `kind: EncryptionConfiguration
apiVersion: apiserver.config.k8s.io/v1
resources:
- resources:
- secrets
providers:
- kms:
name: foo
endpoint: unix:///tmp/testprovider.sock
`,
want: kmsPluginConnectionTimeout,
},
{
desc: "duration is invalid (negative), error should be returned",
config: `kind: EncryptionConfiguration
apiVersion: apiserver.config.k8s.io/v1
resources:
- resources:
- secrets
providers:
- kms:
name: foo
endpoint: unix:///tmp/testprovider.sock
timeout: -15s
`,
wantErr: "timeout should be a positive value",
},
}
for _, tt := range testCases {
t.Run(tt.desc, func(t *testing.T) {
// mocking envelopeServiceFactory to sense the value of the supplied timeout.
envelopeServiceFactory = func(endpoint string, callTimeout time.Duration) (envelope.Service, error) {
if callTimeout != tt.want {
t.Fatalf("got timeout: %v, want %v", callTimeout, tt.want)
}
return newMockEnvelopeService(endpoint, callTimeout)
}
// mocked envelopeServiceFactory is called during ParseEncryptionConfiguration.
if _, err := ParseEncryptionConfiguration(strings.NewReader(tt.config)); err != nil && !strings.Contains(err.Error(), tt.wantErr) {
t.Fatalf("unable to parse yaml\n%s\nerror: %v", tt.config, err)
}
})
}
}

View File

@ -36,42 +36,52 @@ go_test(
"//staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/aes:go_default_library",
] + select({
"@io_bazel_rules_go//go/platform:android": [
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/v1beta1:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
],
"@io_bazel_rules_go//go/platform:darwin": [
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/v1beta1:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
],
"@io_bazel_rules_go//go/platform:dragonfly": [
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/v1beta1:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
],
"@io_bazel_rules_go//go/platform:freebsd": [
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/v1beta1:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
],
"@io_bazel_rules_go//go/platform:linux": [
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/v1beta1:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
],
"@io_bazel_rules_go//go/platform:nacl": [
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/v1beta1:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
],
"@io_bazel_rules_go//go/platform:netbsd": [
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/v1beta1:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
],
"@io_bazel_rules_go//go/platform:openbsd": [
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/v1beta1:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
],
"@io_bazel_rules_go//go/platform:plan9": [
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/v1beta1:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
],
"@io_bazel_rules_go//go/platform:solaris": [
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/v1beta1:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
],

View File

@ -31,18 +31,17 @@ import (
"google.golang.org/grpc"
"k8s.io/apimachinery/pkg/util/uuid"
kmsapi "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/v1beta1"
)
const (
endpoint = "unix:///@kms-socket.sock"
)
// TestKMSPluginLateStart tests the scenario where kms-plugin pod/container starts after kube-apiserver pod/container.
// Since the Dial to kms-plugin is non-blocking we expect the construction of gRPC service to succeed even when
// kms-plugin is not yet up - dialing happens in the background.
func TestKMSPluginLateStart(t *testing.T) {
t.Parallel()
callTimeout := 3 * time.Second
endpoint := getSocketName()
service, err := NewGRPCService(endpoint, callTimeout)
if err != nil {
@ -51,7 +50,7 @@ func TestKMSPluginLateStart(t *testing.T) {
defer destroyService(service)
time.Sleep(callTimeout / 2)
f, err := startFakeKMSProvider(kmsapiVersion)
f, err := startFakeKMSProvider(kmsapiVersion, endpoint)
if err != nil {
t.Fatalf("failed to start test KMS provider server, error: %v", err)
}
@ -64,17 +63,119 @@ func TestKMSPluginLateStart(t *testing.T) {
}
}
// TestTimeout tests behaviour of the kube-apiserver based on the supplied timeout and delayed start of kms-plugin.
func TestTimeouts(t *testing.T) {
t.Parallel()
var testCases = []struct {
desc string
callTimeout time.Duration
pluginDelay time.Duration
kubeAPIServerDelay time.Duration
wantErr string
}{
{
desc: "timeout zero - expect failure when call from kube-apiserver arrives before plugin starts",
callTimeout: 0 * time.Second,
pluginDelay: 3 * time.Second,
wantErr: "rpc error: code = DeadlineExceeded desc = context deadline exceeded",
},
{
desc: "timeout zero but kms-plugin already up - still failure - zero timeout is an invalid value",
callTimeout: 0 * time.Second,
pluginDelay: 0 * time.Second,
kubeAPIServerDelay: 2 * time.Second,
wantErr: "rpc error: code = DeadlineExceeded desc = context deadline exceeded",
},
{
desc: "timeout greater than kms-plugin delay - expect success",
callTimeout: 6 * time.Second,
pluginDelay: 3 * time.Second,
},
{
desc: "timeout less than kms-plugin delay - expect failure",
callTimeout: 3 * time.Second,
pluginDelay: 6 * time.Second,
wantErr: "rpc error: code = DeadlineExceeded desc = context deadline exceeded",
},
}
for _, tt := range testCases {
tt := tt
t.Run(tt.desc, func(t *testing.T) {
t.Parallel()
var (
service Service
err error
data = []byte("test data")
kubeAPIServerWG sync.WaitGroup
kmsPluginWG sync.WaitGroup
testCompletedWG sync.WaitGroup
socketName = getSocketName()
)
testCompletedWG.Add(1)
defer testCompletedWG.Done()
kubeAPIServerWG.Add(1)
go func() {
// Simulating late start of kube-apiserver - plugin is up before kube-apiserver, if requested by the testcase.
time.Sleep(tt.kubeAPIServerDelay)
service, err = NewGRPCService(socketName, tt.callTimeout)
if err != nil {
t.Fatalf("failed to create envelope service, error: %v", err)
}
defer destroyService(service)
kubeAPIServerWG.Done()
// Keeping kube-apiserver up to process requests.
testCompletedWG.Wait()
}()
kmsPluginWG.Add(1)
go func() {
// Simulating delayed start of kms-plugin, kube-apiserver is up before the plugin, if requested by the testcase.
time.Sleep(tt.pluginDelay)
f, err := startFakeKMSProvider(kmsapiVersion, socketName)
if err != nil {
t.Fatalf("failed to start test KMS provider server, error: %v", err)
}
defer f.server.Stop()
kmsPluginWG.Done()
// Keeping plugin up to process requests.
testCompletedWG.Wait()
}()
kubeAPIServerWG.Wait()
_, err = service.Encrypt(data)
if err == nil && tt.wantErr != "" {
t.Fatalf("got nil, want %s", tt.wantErr)
}
if err != nil && tt.wantErr == "" {
t.Fatalf("got %q, want nil", err.Error())
}
// Collecting kms-plugin - allowing plugin to clean-up.
kmsPluginWG.Wait()
})
}
}
// TestIntermittentConnectionLoss tests the scenario where the connection with kms-plugin is intermittently lost.
func TestIntermittentConnectionLoss(t *testing.T) {
t.Parallel()
var (
wg1 sync.WaitGroup
wg2 sync.WaitGroup
timeout = 30 * time.Second
blackOut = 1 * time.Second
data = []byte("test data")
endpoint = getSocketName()
)
// Start KMS Plugin
f, err := startFakeKMSProvider(kmsapiVersion)
f, err := startFakeKMSProvider(kmsapiVersion, endpoint)
if err != nil {
t.Fatalf("failed to start test KMS provider server, error: %v", err)
}
@ -93,8 +194,9 @@ func TestIntermittentConnectionLoss(t *testing.T) {
t.Log("Connected to KMSPlugin")
// Stop KMS Plugin - simulating connection loss
t.Log("KMS Plugin is stopping")
f.server.Stop()
t.Log("KMS Plugin is stopped")
time.Sleep(2 * time.Second)
wg1.Add(1)
wg2.Add(1)
@ -112,7 +214,7 @@ func TestIntermittentConnectionLoss(t *testing.T) {
wg1.Wait()
time.Sleep(blackOut)
// Start KMS Plugin
f, err = startFakeKMSProvider(kmsapiVersion)
f, err = startFakeKMSProvider(kmsapiVersion, endpoint)
if err != nil {
t.Fatalf("failed to start test KMS provider server, error: %v", err)
}
@ -123,11 +225,13 @@ func TestIntermittentConnectionLoss(t *testing.T) {
}
func TestUnsupportedVersion(t *testing.T) {
t.Parallel()
ver := "invalid"
data := []byte("test data")
wantErr := fmt.Errorf(versionErrorf, ver, kmsapiVersion)
endpoint := getSocketName()
f, err := startFakeKMSProvider(ver)
f, err := startFakeKMSProvider(ver, endpoint)
if err != nil {
t.Fatalf("failed to start test KMS provider server, error: %ver", err)
}
@ -162,8 +266,10 @@ func TestUnsupportedVersion(t *testing.T) {
// Normal encryption and decryption operation.
func TestGRPCService(t *testing.T) {
t.Parallel()
// Start a test gRPC server.
f, err := startFakeKMSProvider(kmsapiVersion)
endpoint := getSocketName()
f, err := startFakeKMSProvider(kmsapiVersion, endpoint)
if err != nil {
t.Fatalf("failed to start test KMS provider server, error: %v", err)
}
@ -196,8 +302,10 @@ func TestGRPCService(t *testing.T) {
// Normal encryption and decryption operation by multiple go-routines.
func TestGRPCServiceConcurrentAccess(t *testing.T) {
t.Parallel()
// Start a test gRPC server.
f, err := startFakeKMSProvider(kmsapiVersion)
endpoint := getSocketName()
f, err := startFakeKMSProvider(kmsapiVersion, endpoint)
if err != nil {
t.Fatalf("failed to start test KMS provider server, error: %v", err)
}
@ -245,10 +353,15 @@ func destroyService(service Service) {
}
}
func getSocketName() string {
return fmt.Sprintf("unix:///@%s.sock", uuid.NewUUID())
}
// Test all those invalid configuration for KMS provider.
func TestInvalidConfiguration(t *testing.T) {
t.Parallel()
// Start a test gRPC server.
f, err := startFakeKMSProvider(kmsapiVersion)
f, err := startFakeKMSProvider(kmsapiVersion, getSocketName())
if err != nil {
t.Fatalf("failed to start test KMS provider server, error: %v", err)
}
@ -275,7 +388,7 @@ func TestInvalidConfiguration(t *testing.T) {
}
// Start the gRPC server that listens on unix socket.
func startFakeKMSProvider(version string) (*fakeKMSPlugin, error) {
func startFakeKMSProvider(version, endpoint string) (*fakeKMSPlugin, error) {
sockFile, err := parseEndpoint(endpoint)
if err != nil {
return nil, fmt.Errorf("failed to parse endpoint:%q, error %v", endpoint, err)