From 564d53f71b3fb52e956e4bece12483e4ba3f6248 Mon Sep 17 00:00:00 2001 From: hzxuzhonghu Date: Tue, 6 Mar 2018 11:20:46 +0800 Subject: [PATCH 1/3] optimize requestcontext: use RWMutex and atomic.Value --- hack/.golint_failures | 1 + .../pkg/endpoints/request/context.go | 4 +- .../pkg/endpoints/request/requestcontext.go | 57 ++++++++++++++----- 3 files changed, 46 insertions(+), 16 deletions(-) diff --git a/hack/.golint_failures b/hack/.golint_failures index 429d70e54cf..a56f2901b2c 100644 --- a/hack/.golint_failures +++ b/hack/.golint_failures @@ -543,6 +543,7 @@ staging/src/k8s.io/apiserver/pkg/endpoints/handlers staging/src/k8s.io/apiserver/pkg/endpoints/handlers/negotiation staging/src/k8s.io/apiserver/pkg/endpoints/metrics staging/src/k8s.io/apiserver/pkg/endpoints/openapi/testing +staging/src/k8s.io/apiserver/pkg/endpoints/request staging/src/k8s.io/apiserver/pkg/endpoints/testing staging/src/k8s.io/apiserver/pkg/features staging/src/k8s.io/apiserver/pkg/registry/generic diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/request/context.go b/staging/src/k8s.io/apiserver/pkg/endpoints/request/context.go index c4665ea3338..07ba3ac76b5 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/request/context.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/request/context.go @@ -18,7 +18,7 @@ package request import ( "context" - stderrs "errors" + "errors" "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -83,7 +83,7 @@ func NewDefaultContext() Context { func WithValue(parent Context, key interface{}, val interface{}) Context { internalCtx, ok := parent.(context.Context) if !ok { - panic(stderrs.New("Invalid context type")) + panic(errors.New("Invalid context type")) } return context.WithValue(internalCtx, key, val) } diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/request/requestcontext.go b/staging/src/k8s.io/apiserver/pkg/endpoints/request/requestcontext.go index 3dc771bafac..4f3b1af79a1 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/request/requestcontext.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/request/requestcontext.go @@ -20,6 +20,7 @@ import ( "errors" "net/http" "sync" + "sync/atomic" "github.com/golang/glog" ) @@ -40,37 +41,62 @@ type RequestContextMapper interface { } type requestContextMap struct { - contexts map[*http.Request]Context - lock sync.Mutex + // contexts contains a request Context map + // atomic.Value has a very good read performance compared to sync.RWMutex + // almost all requests have 3-4 context updates associated with them, + // and they can use only read lock to protect updating context, which is of higher performance with higher burst. + contexts map[*http.Request]*atomic.Value + lock sync.RWMutex } // NewRequestContextMapper returns a new RequestContextMapper. // The returned mapper must be added as a request filter using NewRequestContextFilter. func NewRequestContextMapper() RequestContextMapper { return &requestContextMap{ - contexts: make(map[*http.Request]Context), + contexts: make(map[*http.Request]*atomic.Value), } } +func (c *requestContextMap) getValue(req *http.Request) (*atomic.Value, bool) { + c.lock.RLock() + defer c.lock.RUnlock() + value, ok := c.contexts[req] + return value, ok +} + +// contextWrap is a wrapper of Context to prevent atomic.Value to be copied +type contextWrap struct { + Context +} + // Get returns the context associated with the given request (if any), and true if the request has an associated context, and false if it does not. // Get will only return a valid context when called from inside the filter chain set up by NewRequestContextFilter() func (c *requestContextMap) Get(req *http.Request) (Context, bool) { - c.lock.Lock() - defer c.lock.Unlock() - context, ok := c.contexts[req] - return context, ok + value, ok := c.getValue(req) + if !ok { + return nil, false + } + + if context, ok := value.Load().(contextWrap); ok { + return context.Context, ok + } + + return nil, false } // Update maps the request to the given context. // If no context was previously associated with the request, an error is returned and the context is ignored. func (c *requestContextMap) Update(req *http.Request, context Context) error { - c.lock.Lock() - defer c.lock.Unlock() - if _, ok := c.contexts[req]; !ok { - return errors.New("No context associated") + value, ok := c.getValue(req) + if !ok { + return errors.New("no context associated") } - // TODO: ensure the new context is a descendant of the existing one - c.contexts[req] = context + wrapper, ok := value.Load().(contextWrap) + if !ok { + return errors.New("value type does not match") + } + wrapper.Context = context + value.Store(wrapper) return nil } @@ -83,7 +109,10 @@ func (c *requestContextMap) init(req *http.Request, context Context) bool { if _, exists := c.contexts[req]; exists { return false } - c.contexts[req] = context + + value := &atomic.Value{} + value.Store(contextWrap{context}) + c.contexts[req] = value return true } From 3f73ccfcde3bc1ff9bb24d3ec9f3154feb97e166 Mon Sep 17 00:00:00 2001 From: hzxuzhonghu Date: Wed, 7 Mar 2018 11:44:36 +0800 Subject: [PATCH 2/3] add test case for request context mapper --- .../pkg/endpoints/request/context_test.go | 35 ++-- .../endpoints/request/requestcontext_test.go | 154 ++++++++++++++++++ 2 files changed, 171 insertions(+), 18 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/endpoints/request/requestcontext_test.go diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/request/context_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/request/context_test.go index 365cb114e44..6106f1ca882 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/request/context_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/request/context_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package request_test +package request import ( "testing" @@ -22,13 +22,12 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apiserver/pkg/authentication/user" - genericapirequest "k8s.io/apiserver/pkg/endpoints/request" ) // TestNamespaceContext validates that a namespace can be get/set on a context object func TestNamespaceContext(t *testing.T) { - ctx := genericapirequest.NewDefaultContext() - result, ok := genericapirequest.NamespaceFrom(ctx) + ctx := NewDefaultContext() + result, ok := NamespaceFrom(ctx) if !ok { t.Fatalf("Error getting namespace") } @@ -36,8 +35,8 @@ func TestNamespaceContext(t *testing.T) { t.Fatalf("Expected: %s, Actual: %s", metav1.NamespaceDefault, result) } - ctx = genericapirequest.NewContext() - result, ok = genericapirequest.NamespaceFrom(ctx) + ctx = NewContext() + result, ok = NamespaceFrom(ctx) if ok { t.Fatalf("Should not be ok because there is no namespace on the context") } @@ -45,12 +44,12 @@ func TestNamespaceContext(t *testing.T) { //TestUserContext validates that a userinfo can be get/set on a context object func TestUserContext(t *testing.T) { - ctx := genericapirequest.NewContext() - _, ok := genericapirequest.UserFrom(ctx) + ctx := NewContext() + _, ok := UserFrom(ctx) if ok { t.Fatalf("Should not be ok because there is no user.Info on the context") } - ctx = genericapirequest.WithUser( + ctx = WithUser( ctx, &user.DefaultInfo{ Name: "bob", @@ -60,7 +59,7 @@ func TestUserContext(t *testing.T) { }, ) - result, ok := genericapirequest.UserFrom(ctx) + result, ok := UserFrom(ctx) if !ok { t.Fatalf("Error getting user info") } @@ -96,16 +95,16 @@ func TestUserContext(t *testing.T) { //TestUIDContext validates that a UID can be get/set on a context object func TestUIDContext(t *testing.T) { - ctx := genericapirequest.NewContext() - _, ok := genericapirequest.UIDFrom(ctx) + ctx := NewContext() + _, ok := UIDFrom(ctx) if ok { t.Fatalf("Should not be ok because there is no UID on the context") } - ctx = genericapirequest.WithUID( + ctx = WithUID( ctx, types.UID("testUID"), ) - _, ok = genericapirequest.UIDFrom(ctx) + _, ok = UIDFrom(ctx) if !ok { t.Fatalf("Error getting UID") } @@ -113,17 +112,17 @@ func TestUIDContext(t *testing.T) { //TestUserAgentContext validates that a useragent can be get/set on a context object func TestUserAgentContext(t *testing.T) { - ctx := genericapirequest.NewContext() - _, ok := genericapirequest.UserAgentFrom(ctx) + ctx := NewContext() + _, ok := UserAgentFrom(ctx) if ok { t.Fatalf("Should not be ok because there is no UserAgent on the context") } - ctx = genericapirequest.WithUserAgent( + ctx = WithUserAgent( ctx, "TestUserAgent", ) - result, ok := genericapirequest.UserAgentFrom(ctx) + result, ok := UserAgentFrom(ctx) if !ok { t.Fatalf("Error getting UserAgent") } diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/request/requestcontext_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/request/requestcontext_test.go new file mode 100644 index 00000000000..44d8f4857f0 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/request/requestcontext_test.go @@ -0,0 +1,154 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package request + +import ( + "net/http" + "sync" + "testing" +) + +func TestRequestContextMapperGet(t *testing.T) { + mapper := NewRequestContextMapper() + context := NewContext() + req, _ := http.NewRequest("GET", "/api/version/resource", nil) + + // empty mapper + if _, ok := mapper.Get(req); ok { + t.Fatalf("got unexpected context") + } + + // init mapper + mapper.(*requestContextMap).init(req, context) + if _, ok := mapper.Get(req); !ok { + t.Fatalf("got no context") + } + + // remove request context + mapper.(*requestContextMap).remove(req) + if _, ok := mapper.Get(req); ok { + t.Fatalf("got unexpected context") + } + +} +func TestRequestContextMapperUpdate(t *testing.T) { + mapper := NewRequestContextMapper() + context := NewContext() + req, _ := http.NewRequest("GET", "/api/version/resource", nil) + + // empty mapper + if err := mapper.Update(req, context); err == nil { + t.Fatalf("got no error") + } + + // init mapper + if !mapper.(*requestContextMap).init(req, context) { + t.Fatalf("unexpected error, should init mapper") + } + + context = WithNamespace(context, "default") + if err := mapper.Update(req, context); err != nil { + t.Fatalf("unexpected error") + } + + if context, ok := mapper.Get(req); !ok { + t.Fatalf("go no context") + } else { + if ns, _ := NamespaceFrom(context); ns != "default" { + t.Fatalf("unexpected namespace %s", ns) + } + } +} + +func TestRequestContextMapperConcurrent(t *testing.T) { + mapper := NewRequestContextMapper() + + testCases := []struct{ url, namespace string }{ + {"/api/version/resource1", "ns1"}, + {"/api/version/resource2", "ns2"}, + {"/api/version/resource3", "ns3"}, + {"/api/version/resource4", "ns4"}, + {"/api/version/resource5", "ns5"}, + } + + wg := sync.WaitGroup{} + for _, testcase := range testCases { + wg.Add(1) + go func(testcase struct{ url, namespace string }) { + defer wg.Done() + context := NewContext() + req, _ := http.NewRequest("GET", testcase.url, nil) + + if !mapper.(*requestContextMap).init(req, context) { + t.Errorf("unexpected init error") + return + } + if _, ok := mapper.Get(req); !ok { + t.Errorf("got no context") + return + } + context2 := WithNamespace(context, testcase.namespace) + if err := mapper.Update(req, context2); err != nil { + t.Errorf("unexpected update error") + return + } + if context, ok := mapper.Get(req); !ok { + t.Errorf("got no context") + return + } else { + if ns, _ := NamespaceFrom(context); ns != testcase.namespace { + t.Errorf("unexpected namespace %s", ns) + return + } + } + }(testcase) + } + wg.Wait() +} + +func BenchmarkRequestContextMapper(b *testing.B) { + mapper := NewRequestContextMapper() + + b.SetParallelism(500) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + context := NewContext() + req, _ := http.NewRequest("GET", "/api/version/resource", nil) + + // 1 init + mapper.(*requestContextMap).init(req, context) + + // 5 Get + 4 Update + mapper.Get(req) + context = WithNamespace(context, "default1") + mapper.Update(req, context) + mapper.Get(req) + context = WithNamespace(context, "default2") + mapper.Update(req, context) + mapper.Get(req) + context = WithNamespace(context, "default3") + mapper.Update(req, context) + mapper.Get(req) + context = WithNamespace(context, "default4") + mapper.Update(req, context) + mapper.Get(req) + + // 1 remove + mapper.(*requestContextMap).remove(req) + } + }) +} From e5449d32f9a3c412598cae65192e7ea8c6cc3689 Mon Sep 17 00:00:00 2001 From: hzxuzhonghu Date: Wed, 7 Mar 2018 13:08:13 +0800 Subject: [PATCH 3/3] run update bazel --- .../apiserver/pkg/endpoints/request/BUILD | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/request/BUILD b/staging/src/k8s.io/apiserver/pkg/endpoints/request/BUILD index 0155e98b0f5..30704b26e5a 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/request/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/request/BUILD @@ -8,11 +8,17 @@ load( go_test( name = "go_default_test", - srcs = ["requestinfo_test.go"], + srcs = [ + "context_test.go", + "requestcontext_test.go", + "requestinfo_test.go", + ], embed = [":go_default_library"], deps = [ "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library", ], ) @@ -35,17 +41,6 @@ go_library( ], ) -go_test( - name = "go_default_xtest", - srcs = ["context_test.go"], - deps = [ - "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", - "//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library", - "//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library", - ], -) - filegroup( name = "package-srcs", srcs = glob(["**"]),